package com.mapr.streams.tests.producer;

import com.mapr.db.tests.utils.DBTests;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/producer/ProducerIdempotenceMfsRestartStressTest.class */
public class ProducerIdempotenceMfsRestartStressTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerIdempotenceMfsRestartStressTest.class);
    private static final String PREFIX = "/jtest-" + ProducerIdempotenceMfsRestartStressTest.class.getSimpleName() + "-";
    private static final String TED_FILE = "build_fileserver/fs/common/ted/ted";
    private static Path repoRootPath;
    private static Path tedCmdPath;
    private static Admin madmin;
    private static final int numParts = 1;
    private static final String SET_TED_CMD = " enable IdempotentRequestDrop";
    private static final String RESET_TED_CMD = " disable IdempotentRequestDrop";

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        String str = PREFIX + "idempotentproducerstress";
        try {
            madmin.deleteStream(str);
        } catch (Exception e) {
        }
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setCompressionAlgo("off");
        madmin.createStream(str, newStreamDescriptor);
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        String str = tedCmdPath.toString() + RESET_TED_CMD;
        int ExecuteShellCmdAndGetReturnCode = DBTests.ExecuteShellCmdAndGetReturnCode(str);
        if (ExecuteShellCmdAndGetReturnCode != 0) {
            throw new IOException(str + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode);
        }
        try {
            madmin.deleteStream(PREFIX + "idempotentproducerstress");
        } catch (Exception e) {
        }
    }

    private void produceMsgsWithErrorInjection(String str, int i, int i2, int i3, int i4) throws Exception {
        ProducerMultiTest.CountCallback countCallback = new ProducerMultiTest.CountCallback(i * numParts);
        int i5 = 0;
        SendMessagesToProducer[] sendMessagesToProducerArr = new SendMessagesToProducer[i4];
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("enable.idempotence", true);
        for (int i6 = 0; i6 < i4; i6 += numParts) {
            sendMessagesToProducerArr[i6] = new SendMessagesToProducer(new KafkaProducer(properties), countCallback, str, i2, i, i3);
        }
        String str2 = tedCmdPath.toString() + SET_TED_CMD;
        int ExecuteShellCmdAndGetReturnCode = DBTests.ExecuteShellCmdAndGetReturnCode(str2);
        if (ExecuteShellCmdAndGetReturnCode != 0) {
            throw new IOException(str2 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode);
        }
        Thread[] threadArr = new Thread[i4];
        for (int i7 = 0; i7 < i4; i7 += numParts) {
            threadArr[i7] = new Thread(sendMessagesToProducerArr[i7]);
            threadArr[i7].start();
        }
        while (i5 < 10) {
            i5 += numParts;
            String str3 = tedCmdPath.toString() + SET_TED_CMD;
            int ExecuteShellCmdAndGetReturnCode2 = DBTests.ExecuteShellCmdAndGetReturnCode(str3);
            if (ExecuteShellCmdAndGetReturnCode2 != 0) {
                throw new IOException(str3 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode2);
            }
            try {
                Thread.sleep(3000L);
            } catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            String str4 = tedCmdPath.toString() + RESET_TED_CMD;
            int ExecuteShellCmdAndGetReturnCode3 = DBTests.ExecuteShellCmdAndGetReturnCode(str4);
            if (ExecuteShellCmdAndGetReturnCode3 != 0) {
                throw new IOException(str4 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode3);
            }
            try {
                Thread.sleep(100L);
            } catch (Exception e2) {
                System.out.println("Sleep interrupted " + e2);
            }
        }
        for (int i8 = 0; i8 < i4; i8 += numParts) {
            threadArr[i8].join();
        }
    }

    private void produceMsgsWithMfsRestart(String str, int i, int i2, int i3, int i4) throws Exception {
        ProducerMultiTest.CountCallback countCallback = new ProducerMultiTest.CountCallback(i * numParts);
        SendMessagesToProducer[] sendMessagesToProducerArr = new SendMessagesToProducer[i4];
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("enable.idempotence", true);
        for (int i5 = 0; i5 < i4; i5 += numParts) {
            sendMessagesToProducerArr[i5] = new SendMessagesToProducer(new KafkaProducer(properties), countCallback, str, i2, i, i3);
        }
        String str2 = tedCmdPath.toString() + SET_TED_CMD;
        int ExecuteShellCmdAndGetReturnCode = DBTests.ExecuteShellCmdAndGetReturnCode(str2);
        if (ExecuteShellCmdAndGetReturnCode != 0) {
            throw new IOException(str2 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode);
        }
        Thread[] threadArr = new Thread[i4];
        for (int i6 = 0; i6 < i4; i6 += numParts) {
            threadArr[i6] = new Thread(sendMessagesToProducerArr[i6]);
            threadArr[i6].start();
        }
        try {
            Thread.sleep(1000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        new ProcessBuilder("sudo", "service", "mapr-warden", "restart").start().waitFor();
        for (int i7 = 0; i7 < i4; i7 += numParts) {
            threadArr[i7].join();
        }
    }

    private void consumeMsgs(String str, KafkaConsumer<byte[], byte[]> kafkaConsumer, int i, int i2) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kafkaConsumer.subscribe(arrayList);
        try {
            Thread.sleep(100L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        int i3 = 0;
        while (true) {
            int i4 = i3;
            if (i4 >= i * i2) {
                Assert.assertEquals(i * i2, i4);
                return;
            }
            i3 = i4 + kafkaConsumer.poll(0L).count();
        }
    }

    @Test
    public void testStressWithMfsRestart() throws Exception {
        if (tedCmdPath == null) {
            throw new IOException("Could not find ted command file.");
        }
        String str = PREFIX + "idempotentproducerstress";
        new ProcessBuilder("maprcli", "table", "edit", "-path", str, "-regionsizemb", "256").start().waitFor();
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", false);
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties);
        madmin.createTopic(str, "t", numParts);
        for (int i = 0; i < (500 / 100) - numParts; i += numParts) {
            produceMsgsWithErrorInjection(str + ":t", 100, numParts, 10, 100);
        }
        try {
            Thread.sleep(5000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        produceMsgsWithMfsRestart(str + ":t", 100, numParts, 1024, 100);
        consumeMsgs(str + ":t", kafkaConsumer, 100 * 500, numParts);
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
    }

    static {
        repoRootPath = null;
        tedCmdPath = null;
        repoRootPath = BaseTest.getSourceRoot();
        if (repoRootPath != null) {
            tedCmdPath = repoRootPath.resolve(TED_FILE);
            if (Files.exists(tedCmdPath, new LinkOption[0])) {
                return;
            }
            tedCmdPath = null;
        }
    }
}
