/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.producer;

import com.mapr.db.tests.utils.DBTests;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ProducerIdempotenceTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerIdempotenceTest.class);
    private static final String PREFIX = "/jtest-" + ProducerIdempotenceTest.class.getSimpleName() + "-";
    private static final String TED_FILE = "build_fileserver/fs/common/ted/ted";
    private static Path repoRootPath = null;
    private static Path tedCmdPath = null;
    private static Admin madmin;
    private static final int numParts = 4;
    private static final String SET_TED_CMD = " enable IdempotentRequestDrop";
    private static final String RESET_TED_CMD = " disable IdempotentRequestDrop";

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        String sname = PREFIX + "idempotentproducer";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        sname = PREFIX + "idempotentproducerbucket";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        sname = PREFIX + "idempotentproducerwithtabletsplit";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sdesc = Streams.newStreamDescriptor();
        sdesc.setCompressionAlgo("off");
        madmin.createStream(sname, sdesc);
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        String sname = PREFIX + "idempotentproducer";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "idempotentproducerbucket";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "idempotentproducerwithtabletsplit";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        String cmd = tedCmdPath.toString() + RESET_TED_CMD;
        int setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (setStatus != 0) {
            throw new IOException(cmd + " failed with exitStatus: " + setStatus);
        }
    }

    private void produceMsgs(String topicName, KafkaProducer<byte[], byte[]> kafkaproducer, int numMsgs, int numPartitions, int msgSize, boolean shouldClose) throws Exception {
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        String cmd = null;
        int setStatus = 0;
        int numTries = 0;
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, topicName, numPartitions, numMsgs, msgSize, shouldClose);
        cmd = tedCmdPath.toString() + SET_TED_CMD;
        setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (setStatus != 0) {
            throw new IOException(cmd + " failed with exitStatus: " + setStatus);
        }
        Thread producerThread = new Thread(producer);
        producerThread.start();
        while (numTries < 10) {
            ++numTries;
            cmd = tedCmdPath.toString() + SET_TED_CMD;
            setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
            if (setStatus != 0) {
                throw new IOException(cmd + " failed with exitStatus: " + setStatus);
            }
            try {
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            cmd = tedCmdPath.toString() + RESET_TED_CMD;
            setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
            if (setStatus != 0) {
                throw new IOException(cmd + " failed with exitStatus: " + setStatus);
            }
            try {
                Thread.sleep(100L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
        }
        producerThread.join();
    }

    private void consumeMsgs(String topicName, KafkaConsumer<byte[], byte[]> kafkaconsumer, int numMsgs, int numPartitions) throws Exception {
        int count;
        ConsumerRecords recs;
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(topicName);
        kafkaconsumer.subscribe(topics);
        try {
            Thread.sleep(100L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        for (count = 0; count < numMsgs * numPartitions; count += recs.count()) {
            recs = kafkaconsumer.poll(0L);
        }
        Assert.assertEquals((long)(numMsgs * numPartitions), (long)count);
    }

    private void produceAndConsumeMsgs(String topicName, KafkaProducer<byte[], byte[]> kafkaproducer, KafkaConsumer<byte[], byte[]> kafkaconsumer, int numMsgs, int numPartitions) throws Exception {
        int count;
        ConsumerRecords recs;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        String cmd = null;
        int setStatus = 0;
        int numTries = 0;
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, topicName, numPartitions, numMsgs);
        cmd = tedCmdPath.toString() + SET_TED_CMD;
        setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
        if (setStatus != 0) {
            throw new IOException(cmd + " failed with exitStatus: " + setStatus);
        }
        Thread producerThread = new Thread(producer);
        producerThread.start();
        while (numTries < 10) {
            ++numTries;
            cmd = tedCmdPath.toString() + SET_TED_CMD;
            setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
            if (setStatus != 0) {
                throw new IOException(cmd + " failed with exitStatus: " + setStatus);
            }
            try {
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            cmd = tedCmdPath.toString() + RESET_TED_CMD;
            setStatus = DBTests.ExecuteShellCmdAndGetReturnCode((String)cmd);
            if (setStatus != 0) {
                throw new IOException(cmd + " failed with exitStatus: " + setStatus);
            }
            try {
                Thread.sleep(100L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
        }
        producerThread.join();
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(topicName);
        kafkaconsumer.subscribe(topics);
        try {
            Thread.sleep(100L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        for (count = 0; count < numMsgs * numPartitions; count += recs.count()) {
            recs = kafkaconsumer.poll(0L);
        }
        Assert.assertEquals((long)(numMsgs * numPartitions), (long)count);
    }

    @Test
    public void testProducerIdempotence() throws Exception {
        if (tedCmdPath == null) {
            throw new IOException("Could not find ted command file.");
        }
        String sname = PREFIX + "idempotentproducer";
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("enable.idempotence", (Object)true);
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 1000;
        this.produceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
    }

    @Test
    public void testProducerIdempotenceBucketFlush() throws Exception {
        if (tedCmdPath == null) {
            throw new IOException("Could not find ted command file.");
        }
        String sname = PREFIX + "idempotentproducerbucket";
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("enable.idempotence", (Object)true);
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int msgSize = 0x200000;
        int numMsgs = 500;
        this.produceMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, numMsgs, 4, msgSize, false);
        try {
            Thread.sleep(5000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        this.produceMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, numMsgs, 4, msgSize, false);
        this.consumeMsgs(sname + topicName, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs * 2, 4);
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
    }

    @Test
    public void testProducerIdempotenceWithTabletSplit() throws Exception {
        if (tedCmdPath == null) {
            throw new IOException("Could not find ted command file.");
        }
        String sname = PREFIX + "idempotentproducerwithtabletsplit";
        String topicName = ":t";
        ProcessBuilder proc = new ProcessBuilder("maprcli", "table", "edit", "-path", sname, "-regionsizemb", "256");
        Process process = proc.start();
        int errorCode = process.waitFor();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("enable.idempotence", (Object)true);
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("max.partition.fetch.bytes", (Object)0xA00000);
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int msgSize = 0x200000;
        int numMsgs = 1000;
        this.produceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
    }

    static {
        repoRootPath = BaseTest.getSourceRoot();
        if (repoRootPath != null && !Files.exists(tedCmdPath = repoRootPath.resolve(TED_FILE), new LinkOption[0])) {
            tedCmdPath = null;
        }
    }
}

