/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class OffsetTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(OffsetTest.class);
    private static final String STREAM = "/jtest-" + OffsetTest.class.getSimpleName();
    private static Admin madmin;
    private static KafkaProducer producer;
    private static KafkaConsumer consumer;
    private static final int numParts = 1;

    private static Properties GetProducerProps() throws Exception {
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("streams.parallel.flushers.per.partition", (Object)false);
        return props;
    }

    private static Properties GetConsumerProps() throws Exception {
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", (Object)false);
        return props;
    }

    private static void CreateStream(String streamName) throws Exception {
        try {
            madmin.deleteStream(streamName);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        madmin.createStream(streamName, sdesc);
    }

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        OffsetTest.CreateStream(STREAM);
        producer = new KafkaProducer(OffsetTest.GetProducerProps());
        consumer = new KafkaConsumer(OffsetTest.GetConsumerProps());
    }

    @AfterClass
    public static void cleanupTest() throws Exception {
        producer.close();
        consumer.close();
        madmin.deleteStream(STREAM);
    }

    public List<Long> produceMsgs(String topicName, int msgSz, int nMsgs) throws Exception {
        ProducerRecord rec;
        StringBuffer outputBuffer = new StringBuffer(msgSz);
        for (int i = 0; i < msgSz; ++i) {
            outputBuffer.append("R");
        }
        String msgValue = outputBuffer.toString();
        ArrayList<Future> futureList = new ArrayList<Future>();
        for (int i = 0; i < nMsgs; ++i) {
            rec = new ProducerRecord(topicName, (Object)Integer.toString(i), (Object)msgValue);
            Future future = producer.send(rec);
            futureList.add(future);
        }
        producer.flush();
        rec = new ProducerRecord(topicName, (Object)"", (Object)"terminator");
        producer.send(rec);
        producer.flush();
        ArrayList<Long> offsetsList = new ArrayList<Long>(nMsgs);
        int idx = 0;
        for (Future future : futureList) {
            long offset = ((RecordMetadata)future.get()).offset();
            offsetsList.add(offset);
            ++idx;
        }
        _logger.info("produced " + nMsgs + " msgs, each of sz " + msgSz);
        return offsetsList;
    }

    public int consumeMsgs(String topicName, long offset) throws Exception {
        TopicPartition p0 = new TopicPartition(topicName, 0);
        ArrayList<TopicPartition> plist = new ArrayList<TopicPartition>();
        plist.add(p0);
        consumer.assign(plist);
        consumer.seek(p0, offset);
        boolean terminated = false;
        int numMsgs = 0;
        block0: while (!terminated) {
            ConsumerRecords consumerRecs = consumer.poll(10L);
            for (ConsumerRecord record : consumerRecs) {
                if (((String)record.value()).equals("terminator")) {
                    terminated = true;
                    continue block0;
                }
                ++numMsgs;
            }
        }
        consumer.unsubscribe();
        return numMsgs;
    }

    private void writeMsgs(KafkaProducer kp, String topicName, int nMsgs) throws Exception {
        String msgValue = "seekToEndMessage";
        for (int i = 0; i < nMsgs; ++i) {
            kp.send(new ProducerRecord(topicName, (Object)Integer.toString(i), (Object)msgValue));
        }
        kp.flush();
    }

    private void testSeekToEnd0(int topicCount, int emptyTopicIdx) throws Exception {
        int i;
        int i2;
        String[] topicName = new String[topicCount];
        int[] nMsgs = new int[topicCount];
        String streamName = STREAM + "-SeekToEnd0-" + emptyTopicIdx;
        OffsetTest.CreateStream(streamName);
        KafkaProducer pr = new KafkaProducer(OffsetTest.GetProducerProps());
        for (i2 = 0; i2 < topicCount; ++i2) {
            nMsgs[i2] = (i2 + 1) * 1000;
            topicName[i2] = "t" + i2;
            madmin.createTopic(streamName, topicName[i2], 1);
        }
        for (i2 = 0; i2 < topicCount; ++i2) {
            if (i2 == emptyTopicIdx) continue;
            this.writeMsgs(pr, streamName + ":" + topicName[i2], nMsgs[i2]);
        }
        KafkaConsumer cs = new KafkaConsumer(OffsetTest.GetConsumerProps());
        ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        for (i = 0; i < topicCount; ++i) {
            topicPartitionList.add(new TopicPartition(streamName + ":" + topicName[i], 0));
        }
        cs.assign(topicPartitionList);
        for (i = 0; i < topicCount; ++i) {
            cs.seekToEnd(new TopicPartition[]{(TopicPartition)topicPartitionList.get(i)});
            long seekToEndPosition = cs.position((TopicPartition)topicPartitionList.get(i));
            Assert.assertTrue((seekToEndPosition == (long)((i != emptyTopicIdx ? nMsgs[i] : 0) + 1) ? 1 : 0) != 0);
        }
        pr.close();
        cs.close();
        madmin.deleteStream(streamName);
    }

    private void testSeekToEnd0() throws Exception {
        int topicCount = 3;
        for (int i = 0; i < topicCount; ++i) {
            this.testSeekToEnd0(topicCount, i);
        }
    }

    private void testSeekToEnd1() throws Exception {
        int i;
        int topicCount = 5;
        String[] topicName = new String[topicCount];
        int[] nMsgs = new int[topicCount];
        String streamName = STREAM + "-SeekToEnd1";
        OffsetTest.CreateStream(streamName);
        KafkaProducer pr = new KafkaProducer(OffsetTest.GetProducerProps());
        for (int i2 = 0; i2 < topicCount; ++i2) {
            nMsgs[i2] = i2 == 1 || i2 == 3 ? 100 : 0;
            topicName[i2] = "t" + i2;
            madmin.createTopic(streamName, topicName[i2], 1);
        }
        KafkaConsumer cs = new KafkaConsumer(OffsetTest.GetConsumerProps());
        ArrayList<TopicPartition> topicPartitionList = new ArrayList<TopicPartition>();
        for (i = 0; i < topicCount; ++i) {
            topicPartitionList.add(new TopicPartition(streamName + ":" + topicName[i], 0));
        }
        cs.assign(topicPartitionList);
        for (i = 0; i < topicCount; ++i) {
            if (nMsgs[i] == 0) continue;
            this.writeMsgs(pr, streamName + ":" + topicName[i], nMsgs[i]);
        }
        for (i = 0; i < topicCount; ++i) {
            cs.seekToEnd(new TopicPartition[]{(TopicPartition)topicPartitionList.get(i)});
            long seekToEndPosition = cs.position((TopicPartition)topicPartitionList.get(i));
            Assert.assertTrue((seekToEndPosition == (long)(nMsgs[i] + 1) ? 1 : 0) != 0);
        }
        for (i = 0; i < topicCount; ++i) {
            if (nMsgs[i] == 0) continue;
            int nMessages = 100000;
            int n = i;
            nMsgs[n] = nMsgs[n] + 100000;
            this.writeMsgs(pr, streamName + ":" + topicName[i], 100000);
        }
        for (i = 0; i < topicCount; ++i) {
            cs.seekToEnd(new TopicPartition[]{(TopicPartition)topicPartitionList.get(i)});
            long seekToEndPosition = cs.position((TopicPartition)topicPartitionList.get(i));
            Assert.assertTrue((seekToEndPosition == (long)(nMsgs[i] + 1) ? 1 : 0) != 0);
        }
        for (i = 0; i < topicCount; ++i) {
            if (nMsgs[i] != 0) continue;
            nMsgs[i] = 1;
            this.writeMsgs(pr, streamName + ":" + topicName[i], nMsgs[i]);
        }
        for (i = 0; i < topicCount; ++i) {
            cs.seekToEnd(new TopicPartition[]{(TopicPartition)topicPartitionList.get(i)});
            long seekToEndPosition = cs.position((TopicPartition)topicPartitionList.get(i));
            Assert.assertTrue((seekToEndPosition == (long)(nMsgs[i] + 1) ? 1 : 0) != 0);
        }
        pr.close();
        cs.close();
        madmin.deleteStream(streamName);
    }

    @Test
    public void testSeekToEnd() throws Exception {
        this.testSeekToEnd0();
        this.testSeekToEnd1();
    }

    @Test
    public void testProducerOffsetsSmallMsgs() throws Exception {
        String topicName = STREAM + ":psmall";
        int numProduced = 2000;
        List<Long> offsets = this.produceMsgs(topicName, 1, numProduced);
        for (int i = 1; i < numProduced; ++i) {
            Assert.assertTrue((offsets.get(i) > offsets.get(i - 1) ? 1 : 0) != 0);
        }
    }

    @Test
    public void testProducerOffsetsLargeMsgs() throws Exception {
        String topicName = STREAM + ":plarge";
        int numProduced = 1000;
        List<Long> offsets = this.produceMsgs(topicName, 1024, numProduced);
        for (int i = 1; i < numProduced; ++i) {
            Assert.assertTrue((offsets.get(i) > offsets.get(i - 1) ? 1 : 0) != 0);
        }
    }

    @Test
    public void testSeeksWithSmallMsgs() throws Exception {
        String topicName = STREAM + ":small";
        int numProduced = 200;
        List<Long> offsets = this.produceMsgs(topicName, 1, numProduced);
        int numConsumed = this.consumeMsgs(topicName, 0L);
        _logger.info("consumed " + numConsumed + " msgs starting from offset 0");
        if (numConsumed != numProduced) {
            _logger.error("consumed " + numConsumed + " msgs starting from offset 0, expected " + numProduced);
            Assert.assertTrue((boolean)false);
        }
        for (int i = 0; i < numProduced; ++i) {
            numConsumed = this.consumeMsgs(topicName, offsets.get(i));
            if (numConsumed == numProduced - i) continue;
            _logger.error("consumed " + numConsumed + " msgs starting from offset " + offsets.get(i) + ", expected " + (numProduced - i));
            Assert.assertTrue((boolean)false);
        }
    }

    @Test
    public void testSeeksWithLargeMsgs() throws Exception {
        String topicName = STREAM + ":large";
        int numProduced = 100;
        List<Long> offsets = this.produceMsgs(topicName, 1000, numProduced);
        int numConsumed = this.consumeMsgs(topicName, 0L);
        if (numConsumed != numProduced) {
            _logger.error("consumed " + numConsumed + " msgs starting from offset 0, expected " + numProduced);
            Assert.assertTrue((boolean)false);
        }
        for (int i = 0; i < numProduced; ++i) {
            numConsumed = this.consumeMsgs(topicName, offsets.get(i));
            if (numConsumed == numProduced - i) continue;
            _logger.error("consumed " + numConsumed + " msgs starting from offset " + offsets.get(i) + ", expected " + (numProduced - i));
            Assert.assertTrue((boolean)false);
        }
    }
}

