package com.mapr.streams.tests.producer;

import com.mapr.db.tests.utils.DBTests;
import com.mapr.streams.Admin;
import com.mapr.streams.Streams;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/producer/ProducerIdempotenceSnapshotPreInsertFailureTest.class */
public class ProducerIdempotenceSnapshotPreInsertFailureTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerIdempotenceSnapshotPreInsertFailureTest.class);
    private static final String PREFIX = "/jtest-" + ProducerIdempotenceSnapshotPreInsertFailureTest.class.getSimpleName() + "-";
    private static final String TED_FILE = "build_fileserver/fs/common/ted/ted";
    private static Path repoRootPath;
    private static Path tedCmdPath;
    private static Admin madmin;
    private static final int numParts = 1;
    private static final String SET_TED_CMD = " enable IdempotentRequestDrop";
    private static final String RESET_TED_CMD = " disable IdempotentRequestDrop";
    private static final String SET_DURABLE_TED_CMD = " enable PutAlwaysDurable";
    private static final String RESET_DURABLE_TED_CMD = " disable PutAlwaysDurable";

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        String str = PREFIX + "idempotentproducerpreinsertfailure";
        try {
            madmin.deleteStream(str);
        } catch (Exception e) {
        }
        madmin.createStream(str, Streams.newStreamDescriptor());
        String str2 = tedCmdPath.toString() + RESET_TED_CMD;
        int ExecuteShellCmdAndGetReturnCode = DBTests.ExecuteShellCmdAndGetReturnCode(str2);
        if (ExecuteShellCmdAndGetReturnCode != 0) {
            throw new IOException(str2 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode);
        }
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        try {
            madmin.deleteStream(PREFIX + "idempotentproducerpreinsertfailure");
        } catch (Exception e) {
        }
        String str = tedCmdPath.toString() + RESET_TED_CMD;
        int ExecuteShellCmdAndGetReturnCode = DBTests.ExecuteShellCmdAndGetReturnCode(str);
        if (ExecuteShellCmdAndGetReturnCode != 0) {
            throw new IOException(str + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode);
        }
        String str2 = tedCmdPath.toString() + RESET_DURABLE_TED_CMD;
        int ExecuteShellCmdAndGetReturnCode2 = DBTests.ExecuteShellCmdAndGetReturnCode(str2);
        if (ExecuteShellCmdAndGetReturnCode2 != 0) {
            throw new IOException(str2 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode2);
        }
    }

    private void produceMsgs(String str, KafkaProducer<byte[], byte[]> kafkaProducer, int i, int i2, int i3, boolean z, String str2) throws Exception {
        int ExecuteShellCmdAndGetReturnCode;
        int ExecuteShellCmdAndGetReturnCode2;
        int i4 = 0;
        SendMessagesToProducer sendMessagesToProducer = new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(i * numParts), str, i2, i, i3, z);
        String str3 = tedCmdPath.toString() + SET_DURABLE_TED_CMD;
        int ExecuteShellCmdAndGetReturnCode3 = DBTests.ExecuteShellCmdAndGetReturnCode(str3);
        if (ExecuteShellCmdAndGetReturnCode3 != 0) {
            throw new IOException(str3 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode3);
        }
        if (str2 != null && (ExecuteShellCmdAndGetReturnCode2 = DBTests.ExecuteShellCmdAndGetReturnCode(tedCmdPath.toString() + " enableOnce " + str2)) != 0) {
            throw new IOException(str3 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode2);
        }
        Thread thread = new Thread(sendMessagesToProducer);
        thread.start();
        try {
            Thread.sleep(3000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        if (str2 != null && (ExecuteShellCmdAndGetReturnCode = DBTests.ExecuteShellCmdAndGetReturnCode(tedCmdPath.toString() + " disable " + str2)) != 0) {
            throw new IOException(str3 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode);
        }
        while (i4 < 10 && str2 == null) {
            i4 += numParts;
            String str4 = tedCmdPath.toString() + SET_TED_CMD;
            int ExecuteShellCmdAndGetReturnCode4 = DBTests.ExecuteShellCmdAndGetReturnCode(str4);
            if (ExecuteShellCmdAndGetReturnCode4 != 0) {
                throw new IOException(str4 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode4);
            }
            try {
                Thread.sleep(1000L);
            } catch (Exception e2) {
                System.out.println("Sleep interrupted " + e2);
            }
            String str5 = tedCmdPath.toString() + RESET_TED_CMD;
            int ExecuteShellCmdAndGetReturnCode5 = DBTests.ExecuteShellCmdAndGetReturnCode(str5);
            if (ExecuteShellCmdAndGetReturnCode5 != 0) {
                throw new IOException(str5 + " failed with exitStatus: " + ExecuteShellCmdAndGetReturnCode5);
            }
            try {
                Thread.sleep(100L);
            } catch (Exception e3) {
                System.out.println("Sleep interrupted " + e3);
            }
        }
        thread.join();
    }

    private void consumeMsgs(String str, KafkaConsumer<byte[], byte[]> kafkaConsumer, int i, int i2) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kafkaConsumer.subscribe(arrayList);
        try {
            Thread.sleep(100L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        int i3 = 0;
        while (i3 < i * i2) {
            i3 += kafkaConsumer.poll(0L).count();
            System.err.println("messages fetched count " + i3);
        }
        Assert.assertEquals(i * i2, i3);
    }

    @Test
    public void testProducerIdempotenceSnapshotPreInsertFailure() throws Exception {
        if (tedCmdPath == null) {
            throw new IOException("Could not find ted command file.");
        }
        String str = PREFIX + "idempotentproducerpreinsertfailure";
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("enable.idempotence", true);
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        produceMsgs(str + ":t", kafkaProducer, 500, numParts, 2097152, false, null);
        try {
            Thread.sleep(5000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        produceMsgs(str + ":t", kafkaProducer, 500, numParts, 2097152, false, "IdempotentSnapshotPreInsertCrash");
        consumeMsgs(str + ":t", kafkaConsumer, 500 * 2, numParts);
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
    }

    static {
        repoRootPath = null;
        tedCmdPath = null;
        repoRootPath = BaseTest.getSourceRoot();
        if (repoRootPath != null) {
            tedCmdPath = repoRootPath.resolve(TED_FILE);
            if (Files.exists(tedCmdPath, new LinkOption[0])) {
                return;
            }
            tedCmdPath = null;
        }
    }
}
