/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.db.TableDescriptor;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.TimestampType;
import com.mapr.streams.TopicDescriptor;
import com.mapr.streams.listener.Listener;
import com.mapr.streams.listener.ListenerV10;
import com.mapr.streams.producer.Producer;
import com.mapr.streams.tests.listener.TestConsumerInterceptor;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.streams.tests.producer.SendTimedMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ListenerV10Test
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerV10Test.class);
    private static final String PREFIX = "/jtest-" + ListenerV10Test.class.getSimpleName() + "-";
    private static Admin madmin;
    private static final int numParts = 4;

    @BeforeClass
    public static void setupTest() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        String sname = PREFIX + "pollMaxRecords";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenertimedmessages";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerlogtimedmessages";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listeneredittimedmessages";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerinterceptoronconsume";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerinterceptorasync";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerinterceptorautocommit";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerinterceptorasynccb";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerinterceptorasynccbpart";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerinterceptorsync";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerinterceptorsyncpart";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "pollmaxfetchsize";
        try {
            madmin.deleteStream(sname + 0);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "pollMaxRecords";
        try {
            madmin.deleteStream(sname + 0);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "headers";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenertimeindex";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenertimeindexmultiplefeeds";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenertimedmessages";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerlogtimedmessages";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listeneredittimedmessages";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerpaused";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "endoffsetssingle";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "endoffsetsmultitopics";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "endoffsetstopictimeout";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "endoffsetsparttimeout";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "endoffsetsmultistreams1";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "endoffsetsmultistreams2";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "endoffsetsnostream";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "begoffsetssingle";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "begoffsetsmultitopics";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "begoffsetsmultistreams1";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "begoffsetsmultistreams2";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "closewithtimeout";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "closewithlargetimeout";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testCloseWithLargeTimeout() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setCompressionAlgo("off");
        String sname = PREFIX + "closewithlargetimeout";
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        madmin.createTopic(sname, "t", 4);
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)true);
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        int numMsgs = 10000;
        byte[] key = new byte[10];
        byte[] value = new byte[10];
        String streamTopicName = sname + topicName;
        for (int i = 0; i < numMsgs; ++i) {
            for (int j = 0; j < 4; ++j) {
                ProducerRecord record = new ProducerRecord(streamTopicName, Integer.valueOf(j), (Object)key, (Object)value);
                kafkaproducer.send(record);
            }
        }
        long startTime = System.currentTimeMillis();
        kafkaproducer.close(50L, TimeUnit.SECONDS);
        long stopTime = System.currentTimeMillis();
        Assert.assertTrue((stopTime - startTime < 2000L ? 1 : 0) != 0);
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(streamTopicName);
        kafkaconsumer.subscribe(topics);
        int totalNumMsgs = 0;
        Set subscribed = kafkaconsumer.assignment();
        Assert.assertTrue((subscribed.size() == 4 ? 1 : 0) != 0);
        for (int i = 0; i < 4; ++i) {
            ConsumerRecords recs = kafkaconsumer.poll(1000L);
            totalNumMsgs += recs.count();
        }
        kafkaconsumer.unsubscribe();
        startTime = System.currentTimeMillis();
        kafkaconsumer.close(50L, TimeUnit.SECONDS);
        stopTime = System.currentTimeMillis();
        Assert.assertTrue((stopTime - startTime < 2000L ? 1 : 0) != 0);
        madmin.deleteStream(sname);
    }

    @Test
    public void testCloseWithTimeout() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setCompressionAlgo("off");
        String sname = PREFIX + "closewithtimeout";
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        madmin.createTopic(sname, "t", 4);
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)true);
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        int numMsgs = 10000;
        byte[] key = new byte[10];
        byte[] value = new byte[10];
        String streamTopicName = sname + topicName;
        for (int i = 0; i < numMsgs; ++i) {
            for (int j = 0; j < 4; ++j) {
                ProducerRecord record = new ProducerRecord(streamTopicName, Integer.valueOf(j), (Object)key, (Object)value);
                kafkaproducer.send(record);
            }
        }
        kafkaproducer.close(1L, TimeUnit.MILLISECONDS);
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(streamTopicName);
        kafkaconsumer.subscribe(topics);
        int totalNumMsgs = 0;
        Set subscribed = kafkaconsumer.assignment();
        Assert.assertTrue((subscribed.size() == 4 ? 1 : 0) != 0);
        for (int i = 0; i < 4; ++i) {
            ConsumerRecords recs = kafkaconsumer.poll(1000L);
            totalNumMsgs += recs.count();
        }
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close(1L, TimeUnit.MILLISECONDS);
        madmin.deleteStream(sname);
    }

    @Test
    public void testBegOffsetsMultipleStreams() throws IOException {
        TopicPartition tp;
        int j;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        String sname1 = PREFIX + "begoffsetsmultistreams1";
        madmin.createStream(sname1, sdesc);
        String sname2 = PREFIX + "begoffsetsmultistreams2";
        madmin.createStream(sname2, sdesc);
        String topicName1 = ":t1";
        madmin.createTopic(sname1, "t1", 4);
        String topicName2 = ":t2";
        madmin.createTopic(sname2, "t2", 4);
        KafkaProducer<byte[], byte[]> kafkaproducer1 = this.getProducer();
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        int[] numMsgs = new int[]{256, 128};
        ProducerMultiTest.CountCallback cb1 = new ProducerMultiTest.CountCallback(numMsgs[0] * 4);
        SendMessagesToProducer producer1 = new SendMessagesToProducer(kafkaproducer1, cb1, sname1 + topicName1, 4, numMsgs[0]);
        producer1.run();
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs[1] * 4);
        SendMessagesToProducer producer2 = new SendMessagesToProducer(kafkaproducer2, cb2, sname2 + topicName2, 4, numMsgs[1]);
        producer2.run();
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        for (j = 0; j < 4; ++j) {
            tp = new TopicPartition(sname1 + topicName1, j);
            tpList.add(tp);
        }
        for (j = 3; j >= 0; --j) {
            tp = new TopicPartition(sname2 + topicName2, j);
            tpList.add(tp);
        }
        Map out = kafkaconsumer.beginningOffsets(tpList);
        TopicPartition[] tpArray = new TopicPartition[8];
        tpList.toArray(tpArray);
        System.err.println("out size: " + out.size());
        for (int i = 0; i < 2; ++i) {
            for (int j2 = 0; j2 < 4; ++j2) {
                System.err.println("numMsgs : " + numMsgs + " out Offset : " + out.get(tpArray[i]) + " index : " + i);
                Assert.assertTrue(((Long)out.get(tpArray[j2 + i * 4]) == 0L ? 1 : 0) != 0);
            }
        }
        madmin.deleteStream(sname1);
        madmin.deleteStream(sname2);
    }

    @Test
    public void testBegOffsetsMultipleTopics() throws IOException {
        TopicPartition tp;
        int j;
        String sname = PREFIX + "begoffsetsmultitopics";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName1 = ":t1";
        madmin.createTopic(sname, "t1", 4);
        String topicName2 = ":t2";
        madmin.createTopic(sname, "t2", 4);
        KafkaProducer<byte[], byte[]> kafkaproducer1 = this.getProducer();
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        int[] numMsgs = new int[3];
        numMsgs[0] = 256;
        numMsgs[1] = 128;
        ProducerMultiTest.CountCallback cb1 = new ProducerMultiTest.CountCallback(numMsgs[0] * 4);
        SendMessagesToProducer producer1 = new SendMessagesToProducer(kafkaproducer1, cb1, sname + topicName1, 4, numMsgs[0]);
        producer1.run();
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs[1] * 4);
        SendMessagesToProducer producer2 = new SendMessagesToProducer(kafkaproducer2, cb2, sname + topicName2, 4, numMsgs[1]);
        producer2.run();
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        for (j = 3; j >= 0; --j) {
            tp = new TopicPartition(sname + topicName1, j);
            tpList.add(tp);
        }
        for (j = 0; j < 4; ++j) {
            tp = new TopicPartition(sname + topicName2, j);
            tpList.add(tp);
        }
        Map out = kafkaconsumer.beginningOffsets(tpList);
        TopicPartition[] tpArray = new TopicPartition[8];
        tpList.toArray(tpArray);
        for (int i = 0; i < 2; ++i) {
            for (int j2 = 0; j2 < 4; ++j2) {
                System.err.println("numMsgs : " + numMsgs + " out Offset : " + out.get(tpArray[i]) + " index : " + i);
                Assert.assertTrue(((Long)out.get(tpArray[j2 + i * 4]) == 0L ? 1 : 0) != 0);
            }
        }
        madmin.deleteStream(sname);
    }

    @Test
    public void testBegOffsetsSingleTopic() throws IOException {
        int i;
        String sname = PREFIX + "begoffsetssingle";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 4);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 4, numMsgs);
        producer.run();
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        for (int i2 = 0; i2 < 4; ++i2) {
            TopicPartition tp = new TopicPartition(sname + topicName, i2);
            tpList.add(tp);
        }
        Map out = kafkaconsumer.beginningOffsets(tpList);
        TopicPartition[] tpArray = new TopicPartition[4];
        tpList.toArray(tpArray);
        for (int i3 = 0; i3 < 4; ++i3) {
            System.err.println("numMsgs : " + numMsgs + " out Offset : " + out.get(tpArray[i3]) + " index : " + i3);
            Assert.assertTrue(((Long)out.get(tpArray[i3]) == 0L ? 1 : 0) != 0);
        }
        String topicName2 = ":t2";
        madmin.createTopic(sname, "t2", 4);
        tpList = new ArrayList();
        for (i = 0; i < 4; ++i) {
            TopicPartition tp = new TopicPartition(sname + topicName2, i);
            tpList.add(tp);
        }
        out = kafkaconsumer.beginningOffsets(tpList);
        tpList.toArray(tpArray);
        for (i = 0; i < 4; ++i) {
            System.err.println("out Offset : " + out.get(tpArray[i]) + " index : " + i);
            Assert.assertTrue(((Long)out.get(tpArray[i]) == 0L ? 1 : 0) != 0);
        }
        madmin.deleteStream(sname);
    }

    @Test
    public void testEndOffsetsNoPartitions() throws IOException {
        String sname = PREFIX + "endoffsetssingle";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 128;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 4);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 4, numMsgs);
        producer.run();
        ArrayList tpList = new ArrayList();
        Map out = kafkaconsumer.endOffsets(tpList);
        Assert.assertTrue((out.size() == 0 ? 1 : 0) != 0);
        madmin.deleteStream(sname);
    }

    @Test
    public void testEndOffsetsNoStream() throws IOException {
        String sname = PREFIX + "endoffsetsnostream";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        TopicPartition tp = new TopicPartition(sname + ":t", 0);
        tpList.add(tp);
        Exception ex = null;
        try {
            Map out = kafkaconsumer.endOffsets(tpList);
            _logger.info("Test completed without errors !! It was expected to throw exception");
        }
        catch (Exception e) {
            _logger.info("Hit exception " + e);
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof IllegalArgumentException));
    }

    @Test
    public void testEndOffsetsPartitionTimeout() throws Exception {
        int i;
        final String sname = PREFIX + "endoffsetsparttimeout";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 4);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 4, numMsgs);
        producer.run();
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("streams.rpc.timeout.ms", "30000");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        for (int i2 = 0; i2 < 6; ++i2) {
            TopicPartition tp = new TopicPartition(sname + topicName, i2);
            tpList.add(tp);
        }
        Exception ex = null;
        try {
            Map out = kafkaconsumer.endOffsets(tpList);
            _logger.info("Test completed without errors !! It was expected to throw exception");
        }
        catch (Exception e) {
            _logger.info("Hit exception " + e);
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof TimeoutException));
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(5000L);
                    madmin.editTopic(sname, "t", 6);
                }
                catch (IOException | InterruptedException exception) {
                    // empty catch block
                }
            }
        });
        t.start();
        ex = null;
        Map out = null;
        try {
            out = kafkaconsumer.endOffsets(tpList);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex == null ? 1 : 0) != 0);
        TopicPartition[] tpArray = new TopicPartition[6];
        tpList.toArray(tpArray);
        for (i = 0; i < 4; ++i) {
            System.err.println("numMsgs : " + numMsgs + " end Offset : " + out.get(tpArray[i]) + " index : " + i);
            Assert.assertTrue(((Long)out.get(tpArray[i]) == (long)numMsgs ? 1 : 0) != 0);
        }
        for (i = 4; i < 6; ++i) {
            System.err.println(" End Offset : " + out.get(tpArray[i]) + " index : " + i);
            Assert.assertTrue(((Long)out.get(tpArray[i]) == 0L ? 1 : 0) != 0);
        }
        t.join();
        madmin.deleteStream(sname);
    }

    @Test
    public void testEndOffsetsTopicTimeout() throws Exception {
        final String sname = PREFIX + "endoffsetstopictimeout";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("streams.rpc.timeout.ms", "30000");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        TopicPartition tp = new TopicPartition(sname + ":t", 0);
        tpList.add(tp);
        Exception ex = null;
        try {
            ex = null;
            Map out = kafkaconsumer.endOffsets(tpList);
            _logger.info("Test completed without errors !! It was expected to throw exception");
        }
        catch (Exception e) {
            _logger.info("Hit exception " + e);
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof TimeoutException));
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(5000L);
                    madmin.createTopic(sname, "t", 1);
                }
                catch (IOException | InterruptedException exception) {
                    // empty catch block
                }
            }
        });
        t.start();
        ex = null;
        Map out = null;
        try {
            out = kafkaconsumer.endOffsets(tpList);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex == null ? 1 : 0) != 0);
        TopicPartition[] tpArray = new TopicPartition[4];
        tpList.toArray(tpArray);
        System.err.println(" end Offset : " + out.get(tpArray[0]) + " index : " + 0);
        Assert.assertTrue(((Long)out.get(tpArray[0]) == 0L ? 1 : 0) != 0);
        t.join();
        madmin.deleteStream(sname);
    }

    @Test
    public void testEndOffsetsMultipleStreams() throws IOException {
        TopicPartition tp;
        int j;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        String sname1 = PREFIX + "endoffsetsmultistreams1";
        madmin.createStream(sname1, sdesc);
        String sname2 = PREFIX + "endoffsetsmultistreams2";
        madmin.createStream(sname2, sdesc);
        String topicName1 = ":t1";
        madmin.createTopic(sname1, "t1", 4);
        String topicName2 = ":t2";
        madmin.createTopic(sname2, "t2", 4);
        KafkaProducer<byte[], byte[]> kafkaproducer1 = this.getProducer();
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        int[] numMsgs = new int[]{256, 128};
        ProducerMultiTest.CountCallback cb1 = new ProducerMultiTest.CountCallback(numMsgs[0] * 4);
        SendMessagesToProducer producer1 = new SendMessagesToProducer(kafkaproducer1, cb1, sname1 + topicName1, 4, numMsgs[0]);
        producer1.run();
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs[1] * 4);
        SendMessagesToProducer producer2 = new SendMessagesToProducer(kafkaproducer2, cb2, sname2 + topicName2, 4, numMsgs[1]);
        producer2.run();
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        for (j = 0; j < 4; ++j) {
            tp = new TopicPartition(sname1 + topicName1, j);
            tpList.add(tp);
        }
        for (j = 3; j >= 0; --j) {
            tp = new TopicPartition(sname2 + topicName2, j);
            tpList.add(tp);
        }
        Map out = kafkaconsumer.endOffsets(tpList);
        TopicPartition[] tpArray = new TopicPartition[8];
        tpList.toArray(tpArray);
        System.err.println("out size: " + out.size());
        for (int i = 0; i < 2; ++i) {
            for (int j2 = 0; j2 < 4; ++j2) {
                System.err.println("numMsgs : " + numMsgs + " out Offset : " + out.get(tpArray[i]) + " index : " + i);
                Assert.assertTrue(((Long)out.get(tpArray[j2 + i * 4]) == (long)numMsgs[i] ? 1 : 0) != 0);
            }
        }
        madmin.deleteStream(sname1);
        madmin.deleteStream(sname2);
    }

    @Test
    public void testEndOffsetsMultipleTopics() throws IOException {
        TopicPartition tp;
        int j;
        String sname = PREFIX + "endoffsetsmultitopics";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName1 = ":t1";
        madmin.createTopic(sname, "t1", 4);
        String topicName2 = ":t2";
        madmin.createTopic(sname, "t2", 4);
        KafkaProducer<byte[], byte[]> kafkaproducer1 = this.getProducer();
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        int[] numMsgs = new int[3];
        numMsgs[0] = 256;
        numMsgs[1] = 128;
        ProducerMultiTest.CountCallback cb1 = new ProducerMultiTest.CountCallback(numMsgs[0] * 4);
        SendMessagesToProducer producer1 = new SendMessagesToProducer(kafkaproducer1, cb1, sname + topicName1, 4, numMsgs[0]);
        producer1.run();
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs[1] * 4);
        SendMessagesToProducer producer2 = new SendMessagesToProducer(kafkaproducer2, cb2, sname + topicName2, 4, numMsgs[1]);
        producer2.run();
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        for (j = 3; j >= 0; --j) {
            tp = new TopicPartition(sname + topicName1, j);
            tpList.add(tp);
        }
        for (j = 0; j < 4; ++j) {
            tp = new TopicPartition(sname + topicName2, j);
            tpList.add(tp);
        }
        Map out = kafkaconsumer.endOffsets(tpList);
        TopicPartition[] tpArray = new TopicPartition[8];
        tpList.toArray(tpArray);
        for (int i = 0; i < 2; ++i) {
            for (int j2 = 0; j2 < 4; ++j2) {
                System.err.println("numMsgs : " + numMsgs + " out Offset : " + out.get(tpArray[i]) + " index : " + i);
                Assert.assertTrue(((Long)out.get(tpArray[j2 + i * 4]) == (long)numMsgs[i] ? 1 : 0) != 0);
            }
        }
        madmin.deleteStream(sname);
    }

    @Test
    public void testEndOffsetsSingleTopic() throws IOException {
        int i;
        String sname = PREFIX + "endoffsetssingle";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 4);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 4, numMsgs);
        producer.run();
        ArrayList<TopicPartition> tpList = new ArrayList<TopicPartition>();
        for (int i2 = 0; i2 < 4; ++i2) {
            TopicPartition tp = new TopicPartition(sname + topicName, i2);
            tpList.add(tp);
        }
        Map out = kafkaconsumer.endOffsets(tpList);
        TopicPartition[] tpArray = new TopicPartition[4];
        tpList.toArray(tpArray);
        for (int i3 = 0; i3 < 4; ++i3) {
            System.err.println("numMsgs : " + numMsgs + " out Offset : " + out.get(tpArray[i3]) + " index : " + i3);
            Assert.assertTrue(((Long)out.get(tpArray[i3]) == (long)numMsgs ? 1 : 0) != 0);
        }
        String topicName2 = ":t2";
        madmin.createTopic(sname, "t2", 4);
        tpList = new ArrayList();
        for (i = 0; i < 4; ++i) {
            TopicPartition tp = new TopicPartition(sname + topicName2, i);
            tpList.add(tp);
        }
        out = kafkaconsumer.endOffsets(tpList);
        tpList.toArray(tpArray);
        for (i = 0; i < 4; ++i) {
            System.err.println("out Offset : " + out.get(tpArray[i]) + " index : " + i);
            Assert.assertTrue(((Long)out.get(tpArray[i]) == 0L ? 1 : 0) != 0);
        }
        madmin.deleteStream(sname);
    }

    @Test
    public void testHeaders() throws IOException {
        String sname = PREFIX + "headers";
        String snameFull = sname + 0;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        _logger.info("Populate stream and check listener");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false, 10, true));
        Assert.assertTrue((boolean)ListenerV10.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 65536, 131072, 1, true, true));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false, 10, false));
        Assert.assertTrue((boolean)ListenerV10.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 65536, 131072, 1, true, false));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testPollWithVaryingMaxFetchSize() throws IOException {
        String sname = PREFIX + "pollmaxfetchsize";
        String snameFull = sname + 0;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        _logger.info("Populate stream and check listener");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 131072, 1536, Integer.MAX_VALUE));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 131072, 10240, Integer.MAX_VALUE));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 131072, 1536, Integer.MAX_VALUE));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 131072, 10240, Integer.MAX_VALUE));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        _logger.info("test with max fetch size lesser than msg size");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 10000, 131072, 108, Integer.MAX_VALUE));
        madmin.deleteStream(snameFull);
    }

    @Test
    public void testPollWithVaryingMaxRecords() throws IOException {
        String sname = PREFIX + "pollMaxRecords";
        String snameFull = sname + 0;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        _logger.info("Populate stream and check listener");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false, 10, true));
        Assert.assertTrue((boolean)ListenerV10.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 65536, 131072, 1, true, true));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false, 10, true));
        Assert.assertTrue((boolean)ListenerV10.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 65536, 131072, 10, true, true));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false, 10, true));
        Assert.assertTrue((boolean)ListenerV10.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 65536, 131072, 1, true, true));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false, 10, true));
        Assert.assertTrue((boolean)ListenerV10.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 65536, 131072, 10, true, true));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testListenerPaused() throws Exception {
        String sname = PREFIX + "listenerpaused";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        madmin.createTopic(sname, "t1", 4);
        madmin.createTopic(sname, "t2", 4);
        madmin.createTopic(sname, "t3", 4);
        TopicPartition tp1 = new TopicPartition(sname + ":t1", 0);
        TopicPartition tp2 = new TopicPartition(sname + ":t2", 0);
        TopicPartition tp3 = new TopicPartition(sname + ":t3", 0);
        List<TopicPartition> tpList = Arrays.asList(tp1, tp2, tp3);
        KafkaConsumer<byte[], byte[]> consumer = this.getConsumer();
        consumer.assign(tpList);
        consumer.pause(Arrays.asList(tp2, tp3));
        Set pausedParts = consumer.paused();
        Assert.assertTrue((pausedParts.contains(tp2) && pausedParts.contains(tp3) && !pausedParts.contains(tp1) ? 1 : 0) != 0);
        consumer.resume(Arrays.asList(tp2, tp3));
        pausedParts = consumer.paused();
        Assert.assertTrue((pausedParts.isEmpty() ? 1 : 0) != 0);
    }

    @Test
    public void testListenerTimeIndexDescending() throws Exception {
        String sname = PREFIX + "listenertimeindexdescending";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 100;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, -1, startTime);
        producer.run();
        TopicPartition tp = new TopicPartition(sname + topicName, 0);
        Long ts = 910L;
        HashMap<TopicPartition, Long> tsSearch = new HashMap<TopicPartition, Long>();
        tsSearch.put(tp, ts);
        Map out = kafkaconsumer.offsetsForTimes(tsSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 1000L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 0L ? 1 : 0) != 0);
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerTimeIndex() throws Exception {
        String sname = PREFIX + "listenertimeindex";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, startTime);
        producer.run();
        TopicPartition tp = new TopicPartition(sname + topicName, 0);
        Long ts = 1009L;
        HashMap<TopicPartition, Long> tsSearch = new HashMap<TopicPartition, Long>();
        tsSearch.put(tp, ts);
        Map out = kafkaconsumer.offsetsForTimes(tsSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 1009L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 9L ? 1 : 0) != 0);
        startTime = 15000L;
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.getProducer();
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendTimedMessagesToProducer producer2 = new SendTimedMessagesToProducer(kafkaproducer2, cb2, sname + topicName, 1, numMsgs, startTime);
        producer2.run();
        tsSearch.put(tp, 15001L);
        out = kafkaconsumer.offsetsForTimes(tsSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 15001L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 257L ? 1 : 0) != 0);
        startTime = 5000L;
        KafkaProducer<byte[], byte[]> kafkaproducer3 = this.getProducer();
        ProducerMultiTest.CountCallback cb3 = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendTimedMessagesToProducer producer3 = new SendTimedMessagesToProducer(kafkaproducer3, cb3, sname + topicName, 1, numMsgs, startTime);
        producer3.run();
        out = kafkaconsumer.offsetsForTimes(tsSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 15001L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 257L ? 1 : 0) != 0);
        tsSearch.put(tp, 100L);
        out = kafkaconsumer.offsetsForTimes(tsSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 1000L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 0L ? 1 : 0) != 0);
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerTimeIndexSplit() throws Exception {
        String sname = PREFIX + "listenertimeindexsplit";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        madmin.createTopic(sname, "t", 4);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        com.mapr.db.Admin dbAdmin = MapRDBImpl.newAdmin();
        TableDescriptor tableDesc = dbAdmin.getTableDescriptor(sname);
        long splitSizeMB = 256L;
        tableDesc.setSplitSize(splitSizeMB);
        dbAdmin.alterTable(tableDesc);
        int numMsgs = 1000000;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, startTime, 20000);
        producer.run();
        TopicPartition tp = new TopicPartition(sname + topicName, 0);
        Long ts = 1000000L;
        HashMap<TopicPartition, Long> tsSearch = new HashMap<TopicPartition, Long>();
        tsSearch.put(tp, ts);
        Map out = kafkaconsumer.offsetsForTimes(tsSearch);
        _logger.info("timestamp is " + ((OffsetAndTimestamp)out.get(tp)).timestamp() + " offset is " + ((OffsetAndTimestamp)out.get(tp)).offset());
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() >= 1000000L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 999000L ? 1 : 0) != 0);
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerTimeIndexMultipleFeeds() throws Exception {
        String sname = PREFIX + "listenertimeindexmultiplefeeds";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String sname2 = PREFIX + "listenertimeindexmultiplefeeds2";
        madmin.createStream(sname2, sdesc);
        String topicName = ":t";
        madmin.createTopic(sname, "t", 4);
        String topicName2 = ":t2";
        madmin.createTopic(sname2, "t2", 4);
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, startTime);
        producer.run();
        startTime = 15000L;
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.getProducer();
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendTimedMessagesToProducer producer2 = new SendTimedMessagesToProducer(kafkaproducer2, cb2, sname2 + topicName2, 1, numMsgs, startTime);
        producer2.run();
        HashMap<TopicPartition, Long> tsSearch = new HashMap<TopicPartition, Long>();
        TopicPartition tp = new TopicPartition(sname + topicName, 0);
        tsSearch.put(tp, 1009L);
        TopicPartition tp2 = new TopicPartition(sname2 + topicName2, 0);
        tsSearch.put(tp2, 15001L);
        Map out = kafkaconsumer.offsetsForTimes(tsSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 1009L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 9L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp2)).timestamp() == 15001L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp2)).offset() == 1L ? 1 : 0) != 0);
        kafkaconsumer.close();
        madmin.deleteStream(sname);
        madmin.deleteStream(sname2);
    }

    @Test
    public void testListenerTimeIndexTimeout() throws Exception {
        final String sname = PREFIX + "listenertimeindextimeout";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        madmin.createTopic(sname, "t", 4);
        String topicName2 = ":t2";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("streams.rpc.timeout.ms", "30000");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, startTime);
        producer.run();
        TopicPartition tp = new TopicPartition(sname + topicName, 0);
        HashMap<TopicPartition, Long> tsSearch = new HashMap<TopicPartition, Long>();
        tsSearch.put(tp, 1009L);
        TopicPartition tp2 = new TopicPartition(sname + topicName2, 0);
        tsSearch.put(tp2, 15001L);
        Exception ex = null;
        try {
            Map map = kafkaconsumer.offsetsForTimes(tsSearch);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof TimeoutException));
        Thread t = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(5000L);
                    madmin.createTopic(sname, "t2", 4);
                }
                catch (IOException | InterruptedException e) {
                    _logger.info("ignoring exception " + e);
                }
            }
        });
        t.start();
        ex = null;
        try {
            Map map = kafkaconsumer.offsetsForTimes(tsSearch);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((ex == null ? 1 : 0) != 0);
        t.join();
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerTimedMessages() throws Exception {
        ConsumerRecords recs;
        String sname = PREFIX + "listenertimedmessages";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, startTime);
        producer.run();
        List<TopicPartition> tpList = Arrays.asList(new TopicPartition(sname + topicName, 0));
        kafkaconsumer.assign(tpList);
        for (int count = 0; count != numMsgs; count += recs.count()) {
            recs = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : recs) {
                Assert.assertTrue((rec.timestamp() == startTime++ ? 1 : 0) != 0);
            }
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerZeroEventTime() throws Exception {
        String sname = PREFIX + "listenertimedmessages";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        Long i = 0L;
        while (i <= 3L) {
            kafkaproducer.send(new ProducerRecord(sname + topicName, Integer.valueOf(0), i, null, null));
            kafkaproducer.flush();
            i = i + 1L;
        }
        HashMap<TopicPartition, Long> tsToSearch = new HashMap<TopicPartition, Long>();
        TopicPartition tp = new TopicPartition(sname + topicName, 0);
        tsToSearch.put(tp, 0L);
        Map out = kafkaconsumer.offsetsForTimes(tsToSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 0L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 0L ? 1 : 0) != 0);
        tsToSearch.put(tp, 1L);
        out = kafkaconsumer.offsetsForTimes(tsToSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 1L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 1L ? 1 : 0) != 0);
        tsToSearch.put(tp, 2L);
        out = kafkaconsumer.offsetsForTimes(tsToSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 2L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 2L ? 1 : 0) != 0);
        tsToSearch.put(tp, 3L);
        out = kafkaconsumer.offsetsForTimes(tsToSearch);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).timestamp() == 3L ? 1 : 0) != 0);
        Assert.assertTrue((((OffsetAndTimestamp)out.get(tp)).offset() == 3L ? 1 : 0) != 0);
        kafkaproducer.close();
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    private KafkaProducer<byte[], byte[]> getProducer() {
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("streams.parallel.flushers.per.partition", "false");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        return kafkaproducer;
    }

    private KafkaConsumer<byte[], byte[]> getConsumer() {
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        return kafkaconsumer;
    }

    @Test
    public void testListenerLogTimedMessages() throws Exception {
        ConsumerRecords recs;
        String sname = PREFIX + "listenerlogtimedmessages";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultTimestampType(TimestampType.LOG_APPEND_TIME);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, startTime);
        producer.run();
        List<TopicPartition> tpList = Arrays.asList(new TopicPartition(sname + topicName, 0));
        kafkaconsumer.assign(tpList);
        for (int count = 0; count != numMsgs; count += recs.count()) {
            recs = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : recs) {
                Assert.assertTrue((rec.timestamp() != startTime++ ? 1 : 0) != 0);
            }
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerEditTimedMessages() throws Exception {
        ConsumerRecords recs;
        int count;
        ConsumerRecords recs2;
        String sname = PREFIX + "listeneredittimedmessages";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultTimestampType(TimestampType.LOG_APPEND_TIME);
        sdesc.setTimeToLiveSec(0L);
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        KafkaProducer<byte[], byte[]> kafkaproducer = this.getProducer();
        KafkaConsumer<byte[], byte[]> kafkaconsumer = this.getConsumer();
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        long startTime = 1000L;
        SendTimedMessagesToProducer producer = new SendTimedMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, startTime);
        producer.run();
        List<TopicPartition> tpList = Arrays.asList(new TopicPartition(sname + topicName, 0));
        kafkaconsumer.assign(tpList);
        for (count = 0; count != numMsgs; count += recs2.count()) {
            recs2 = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : recs2) {
                Assert.assertTrue((boolean)rec.timestampType().equals((Object)org.apache.kafka.common.record.TimestampType.LOG_APPEND_TIME));
            }
        }
        TopicDescriptor td = Streams.newTopicDescriptor();
        td.setTimestampType(TimestampType.CREATE_TIME);
        madmin.editTopic(sname, "t", td);
        KafkaProducer<byte[], byte[]> kafkaproducer2 = this.getProducer();
        ProducerMultiTest.CountCallback cb2 = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendTimedMessagesToProducer producer2 = new SendTimedMessagesToProducer(kafkaproducer2, cb2, sname + topicName, 1, numMsgs, startTime);
        producer2.run();
        for (count = 0; count != numMsgs; count += recs.count()) {
            recs = kafkaconsumer.poll(100L);
            for (ConsumerRecord rec : recs) {
                Assert.assertTrue((boolean)rec.timestampType().equals((Object)org.apache.kafka.common.record.TimestampType.CREATE_TIME));
            }
        }
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    private void interceptorProduceAndConsumeMsgs(String topicName, KafkaProducer<byte[], byte[]> kafkaproducer, KafkaConsumer<byte[], byte[]> kafkaconsumer, int numMsgs, int numPartitions) {
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, topicName, numPartitions, numMsgs);
        producer.run();
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(topicName);
        kafkaconsumer.subscribe(topics);
        try {
            Thread.sleep(2000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        ConsumerRecords recs = kafkaconsumer.poll(0L);
        Assert.assertEquals((long)0L, (long)recs.count());
        recs = kafkaconsumer.poll(0L);
        Assert.assertEquals((long)(numMsgs * numPartitions), (long)recs.count());
    }

    @Test
    public void testInterceptorOnConsume() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "listenerinterceptoronconsume";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 10;
        interceptor1.resetCounters();
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        int expectedConsumes = 2;
        Assert.assertEquals((long)expectedConsumes, (long)interceptor1.getOnConsumeCount());
        Assert.assertEquals((long)(numMsgs * 4), (long)interceptor1.getNumRecordsOnConsume());
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testInterceptorOnCommitAutoCommit() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "listenerinterceptorautocommit";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)true);
        listenerProps.put("auto.commit.interval.ms", (Object)1000);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 10;
        interceptor1.resetCounters();
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        try {
            Thread.sleep(5000L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        System.err.println(" On commit count " + interceptor1.getOnCommitCount());
        assert (interceptor1.getOnCommitCount() > 0);
        interceptor1.resetCounters();
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testInterceptorOnCommitAutoCommitOnClose() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "ListenerInterceptorAutoCommitOnClose";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("group.id", "autoCommitCloseTest");
        listenerProps.put("enable.auto.commit", (Object)true);
        listenerProps.put("auto.commit.interval.ms", (Object)100000);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 10;
        interceptor1.resetCounters();
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        System.err.println(" On commit count " + interceptor1.getOnCommitCount());
        assert (interceptor1.getOnCommitCount() > 0);
        madmin.deleteStream(sname);
    }

    @Test
    public void testInterceptorOnCommitASync() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "listenerinterceptorasync";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 10;
        interceptor1.resetCounters();
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        kafkaconsumer.commitAsync();
        try {
            Thread.sleep(300L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        int expectedConsumes = 2;
        Assert.assertEquals((long)expectedConsumes, (long)interceptor1.getOnConsumeCount());
        Assert.assertEquals((long)(numMsgs * 4), (long)interceptor1.getNumRecordsOnConsume());
        Assert.assertEquals((long)1L, (long)interceptor1.getOnCommitCount());
        Assert.assertEquals((long)4L, (long)interceptor1.getNumParttionsOnCommit());
        interceptor1.resetCounters();
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testInterceptorOnCommitASyncCb() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "listenerinterceptorasynccb";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        interceptor1.resetCounters();
        int numMsgs = 10;
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        CommitCb ccb1 = new CommitCb();
        kafkaconsumer.commitAsync((OffsetCommitCallback)ccb1);
        ccb1.commitDone();
        int expectedConsumes = 2;
        Assert.assertEquals((long)expectedConsumes, (long)interceptor1.getOnConsumeCount());
        Assert.assertEquals((long)(numMsgs * 4), (long)interceptor1.getNumRecordsOnConsume());
        Assert.assertEquals((long)1L, (long)interceptor1.getOnCommitCount());
        Assert.assertEquals((long)4L, (long)interceptor1.getNumParttionsOnCommit());
        interceptor1.resetCounters();
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testInterceptorOnCommitASyncCbPart() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "listenerinterceptorasynccbpart";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        interceptor1.resetCounters();
        int numMsgs = 10;
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        TopicPartition[] topicpartitions = new TopicPartition[4];
        for (int i = 0; i < 4; ++i) {
            topicpartitions[i] = new TopicPartition(sname + topicName, i);
        }
        HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (int i = 0; i < 4; i += 2) {
            toCommit.put(topicpartitions[i], new OffsetAndMetadata((long)(2 ^ i)));
        }
        CommitCb ccb2 = new CommitCb();
        kafkaconsumer.commitAsync(toCommit, (OffsetCommitCallback)ccb2);
        ccb2.commitDone();
        int expectedConsumes = 2;
        Assert.assertEquals((long)expectedConsumes, (long)interceptor1.getOnConsumeCount());
        Assert.assertEquals((long)(numMsgs * 4), (long)interceptor1.getNumRecordsOnConsume());
        Assert.assertEquals((long)1L, (long)interceptor1.getOnCommitCount());
        Assert.assertEquals((long)2L, (long)interceptor1.getNumParttionsOnCommit());
        interceptor1.resetCounters();
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testInterceptorOnCommitSync() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "listenerinterceptorsync";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        interceptor1.resetCounters();
        int numMsgs = 10;
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        kafkaconsumer.commitSync();
        int expectedConsumes = 2;
        Assert.assertEquals((long)expectedConsumes, (long)interceptor1.getOnConsumeCount());
        Assert.assertEquals((long)(numMsgs * 4), (long)interceptor1.getNumRecordsOnConsume());
        Assert.assertEquals((long)1L, (long)interceptor1.getOnCommitCount());
        Assert.assertEquals((long)4L, (long)interceptor1.getNumParttionsOnCommit());
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testInterceptorOnCommitSyncPart() throws Exception {
        TestConsumerInterceptor interceptor1 = new TestConsumerInterceptor();
        String sname = PREFIX + "listenerinterceptorsyncpart";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        interceptor1.resetCounters();
        int numMsgs = 10;
        this.interceptorProduceAndConsumeMsgs(sname + topicName, (KafkaProducer<byte[], byte[]>)kafkaproducer, (KafkaConsumer<byte[], byte[]>)kafkaconsumer, numMsgs, 4);
        TopicPartition[] topicpartitions = new TopicPartition[4];
        for (int i = 0; i < 4; ++i) {
            topicpartitions[i] = new TopicPartition(sname + topicName, i);
        }
        HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (int i = 0; i < 4; i += 2) {
            toCommit.put(topicpartitions[i], new OffsetAndMetadata((long)(2 ^ i)));
        }
        kafkaconsumer.commitSync(toCommit);
        int expectedConsumes = 2;
        Assert.assertEquals((long)expectedConsumes, (long)interceptor1.getOnConsumeCount());
        Assert.assertEquals((long)(numMsgs * 4), (long)interceptor1.getNumRecordsOnConsume());
        Assert.assertEquals((long)1L, (long)interceptor1.getOnCommitCount());
        Assert.assertEquals((long)2L, (long)interceptor1.getNumParttionsOnCommit());
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    public final class CommitCb
    implements OffsetCommitCallback {
        private boolean committed = false;
        private Map<TopicPartition, OffsetAndMetadata> committedOffsets = null;
        private Exception committedException = null;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            CommitCb commitCb = this;
            synchronized (commitCb) {
                this.committedOffsets = offsets;
                this.committedException = exception;
                this.committed = true;
                this.notifyAll();
            }
        }

        public synchronized void commitDone() {
            while (!this.committed) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
        }

        public Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets() {
            return this.committedOffsets;
        }

        public Exception getException() {
            return this.committedException;
        }
    }
}

