/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.listener.Listener;
import com.mapr.streams.producer.Producer;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class BasicListenerTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(BasicListenerTest.class);
    private static final String PREFIX = "/jtest-" + BasicListenerTest.class.getSimpleName() + "-";
    private static Admin madmin;
    private static final int numParts = 4;

    @BeforeClass
    public static void setupTest() throws Exception {
        int i;
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        String sname = PREFIX + "LGtopicSubscr";
        int numStreams = 2;
        for (int i2 = 0; i2 < numStreams; ++i2) {
            try {
                madmin.deleteStream(sname + i2);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        String sname1 = PREFIX + "topicSubscr";
        numStreams = 2;
        for (int i3 = 0; i3 < numStreams; ++i3) {
            try {
                madmin.deleteStream(sname1 + i3);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        String sname2 = PREFIX + "LGMultiListener";
        numStreams = 2;
        for (int i4 = 0; i4 < numStreams; ++i4) {
            try {
                madmin.deleteStream(sname2 + i4);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        sname = PREFIX + "singlestream";
        String snameFull = sname + 0;
        try {
            madmin.deleteStream(snameFull);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "multistream";
        numStreams = 4;
        for (i = 0; i < numStreams; ++i) {
            try {
                madmin.deleteStream(sname + i);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        sname = PREFIX + "multistreameh";
        numStreams = 4;
        for (i = 0; i < numStreams; ++i) {
            try {
                madmin.deleteStream(sname + i);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        sname = PREFIX + "pollfetchsize";
        try {
            madmin.deleteStream(sname + 0);
        }
        catch (Exception i5) {
            // empty catch block
        }
        sname = PREFIX + "topicSubscrHundred";
        try {
            numStreams = 2;
            for (int i6 = 0; i6 < numStreams; ++i6) {
                madmin.deleteStream(sname + i6);
            }
        }
        catch (Exception i6) {
            // empty catch block
        }
        sname = PREFIX + "LGtopicSubscrHundred";
        try {
            numStreams = 2;
            for (int i7 = 0; i7 < numStreams; ++i7) {
                madmin.deleteStream(sname + i7);
            }
        }
        catch (Exception i7) {
            // empty catch block
        }
        sname = PREFIX + "listenerbeforemsgs";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception i7) {
            // empty catch block
        }
        sname = PREFIX + "listenerseektoend";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception i7) {
            // empty catch block
        }
        sname = PREFIX + "listenerfirstpoll";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception i7) {
            // empty catch block
        }
        sname = PREFIX + "listenerseqoffset";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception i7) {
            // empty catch block
        }
        for (int i8 = 0; i8 < 10; ++i8) {
            sname = PREFIX + "listenerseqoffsetwithseek" + i8;
            try {
                madmin.deleteStream(sname);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        sname = PREFIX + "listenerzerooffset";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception i8) {
            // empty catch block
        }
        for (int i9 = 0; i9 < 5; ++i9) {
            sname = PREFIX + "listenerwithtabletsplit" + i9;
            try {
                madmin.deleteStream(sname);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        sname = PREFIX + "testlistenerseekafterpollzero";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
        sname = PREFIX + "listenerwithnullcommit";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testTopicSubscription() throws IOException {
        int i;
        String sname = PREFIX + "topicSubscr";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 10000, !messagesOrdered));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 0, 4, 10000, true, messagesOrdered, null));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testTopicHundredSubscription() throws IOException {
        int i;
        String sname = PREFIX + "topicSubscrHundred";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        System.err.println("creating " + numStreams + " streams");
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 111, 0, 4, 1000, !messagesOrdered));
        System.err.println("starting listener on 111 topics per streams");
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 111, 0, 4, 1000, true, messagesOrdered, null));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Ignore(value="http://bugs.corp.maprtech.com/show_bug.cgi?id=27414")
    @Test
    public void testLGTopicSubscription() throws IOException {
        int i;
        String sname = PREFIX + "LGtopicSubscr";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 1000, !messagesOrdered));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 0, 4, 1000, true, messagesOrdered, "LGTopicTest"));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Ignore(value="http://bugs.corp.maprtech.com/show_bug.cgi?id=27411")
    @Test
    public void testLGTopicHundredSubscription() throws IOException {
        int i;
        String sname = PREFIX + "LGtopicSubscrHundred";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        System.err.println("creating " + numStreams + " streams");
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 111, 0, 4, 1000, !messagesOrdered));
        System.err.println("starting listener group on 111 topics per streams");
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 111, 0, 4, 1000, true, messagesOrdered, "LGTopicTest"));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testLGMultipleListeners() throws IOException {
        int i;
        String sname = PREFIX + "LGMultiListener";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(7);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Listener.runLGTest(sname, numStreams, 2, 7, true, "LGTest"));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Ignore(value="http://bugs.corp.maprtech.com/show_bug.cgi?id=27412")
    @Test
    public void testSingleStream() throws IOException {
        String sname = PREFIX + "singlestream";
        String snameFull = sname + 0;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Exception ex = null;
        madmin.createStream(snameFull, sdesc);
        _logger.info("Populate stream and check listener");
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, 1, 2, 0, 4, 10000, "TPGrp"));
        _logger.info("test listener incorrect expected nummsgs");
        try {
            ex = null;
            Listener.runTest(sname, 1, 2, 0, 4, 20000, "TPGrp");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof IOException));
        _logger.info("test with topics where message are generated slowly");
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, 1, 2, 2, 4, 100000, "TPGrp"));
        madmin.deleteStream(snameFull);
    }

    @Ignore(value="http://bugs.corp.maprtech.com/show_bug.cgi?id=27413")
    @Test
    public void testMultipleStream() throws IOException {
        int i;
        int i2;
        String sname = PREFIX + "multistream";
        int numStreams = 4;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        for (i2 = 0; i2 < numStreams; ++i2) {
            madmin.createStream(sname + i2, sdesc);
        }
        _logger.info("Populate stream and check listener");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 0, 4, 10000));
        _logger.info("test with topics where message are generated slowly");
        for (i2 = 0; i2 < numStreams; ++i2) {
            madmin.deleteStream(sname + i2);
        }
        try {
            Thread.sleep(10000L);
        }
        catch (Exception i3) {
            // empty catch block
        }
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 2, 4, 100000));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testMultipleStreamErrorHandling() throws IOException {
        int i;
        String sname = PREFIX + "multistreameh";
        int numStreams = 4;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Exception ex = null;
        for (int i2 = 0; i2 < numStreams; ++i2) {
            madmin.createStream(sname + i2, sdesc);
        }
        _logger.info("Populate stream");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 100000, false));
        _logger.info("test listener stream delete while polling");
        try {
            ex = null;
            Listener.runTest(sname, numStreams, 2, 0, 4, 100000, Listener.TED_ACTION.kDeleteStream);
            System.out.println("Test completed without errrosr !!");
        }
        catch (Exception e) {
            System.out.println("Got an exception" + e);
            ex = e;
        }
        Assert.assertTrue((ex instanceof NoOffsetForPartitionException || ex instanceof IOException ? 1 : 0) != 0);
        for (i = 0; i < numStreams; ++i) {
            try {
                madmin.deleteStream(sname + i);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        ex = null;
        if (System.getProperty("user.name").equals("root")) {
            return;
        }
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        _logger.info("Populate stream");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 100000, false));
        _logger.info("test listener stream changeperm while polling");
        try {
            ex = null;
            Listener.runTest(sname, numStreams, 2, 0, 4, 100000, Listener.TED_ACTION.kChangePerm);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof NoOffsetForPartitionException));
        Assert.assertTrue((boolean)ex.toString().contains("Permission denied"));
        for (int i3 = 0; i3 < numStreams; ++i3) {
            try {
                madmin.deleteStream(sname + i3);
                continue;
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    @Test
    public void testPollWithVaryingFetchSize() throws IOException {
        String sname = PREFIX + "pollfetchsize";
        String snameFull = sname + 0;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Exception ex = null;
        _logger.info("Populate stream and check listener");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 1536));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 10240));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 1536));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 10240));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        _logger.info("test with max fetch size lesser than msg size");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        try {
            ex = null;
            Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 108));
            _logger.info("Test completed without errors !! It was expected to throw exception");
        }
        catch (Exception e) {
            _logger.info("Hit exception " + e);
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof RecordTooLargeException));
        madmin.deleteStream(snameFull);
    }

    @Test
    public void testListenerBeforeMsgsAreFlushed() throws Exception {
        String sname = PREFIX + "listenerbeforemsgs";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        for (int i = 1; i < 4; ++i) {
            String topicName = ":t" + i;
            Properties props = new Properties();
            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("streams.buffer.max.time.ms", "10000");
            KafkaProducer kafkaproducer = new KafkaProducer(props);
            Properties listenerProps = new Properties();
            listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("fetch.min.bytes", "1");
            listenerProps.put("auto.offset.reset", "earliest");
            KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
            int numParts = 1;
            int numMsgs = 100;
            ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
            SendMessagesToProducer worker = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, numParts, numMsgs);
            ConsumeMessages listenWorker = new ConsumeMessages(kafkaconsumer, sname + topicName, numParts, numMsgs, 20, false);
            Thread producer = new Thread(worker);
            producer.start();
            try {
                Thread.sleep(2000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            Thread consumer = new Thread(listenWorker);
            consumer.start();
            producer.join();
            consumer.join();
            Assert.assertTrue((boolean)listenWorker.hasPassed);
        }
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerWithSeekToEnd() throws Exception {
        String sname = PREFIX + "listenerseektoend";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numParts = 1;
        int numMsgs = 10;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        SendMessagesToProducer worker = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, numParts, numMsgs);
        ConsumeMessages listenWorker = new ConsumeMessages(kafkaconsumer, sname + topicName, numParts, numMsgs, 20, true);
        Thread producer = new Thread(worker);
        Thread consumer = new Thread(listenWorker);
        consumer.start();
        try {
            Thread.sleep(2000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        producer.start();
        producer.join();
        consumer.join();
        Assert.assertTrue((boolean)listenWorker.hasPassed);
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerFirstPoll() throws Exception {
        ConsumerRecords recs;
        String sname = PREFIX + "listenerfirstpoll";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 10;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs);
        producer.run();
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(sname + topicName);
        for (int i = 0; i < 3; ++i) {
            kafkaconsumer.subscribe(topics);
            try {
                Thread.sleep(2000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            recs = kafkaconsumer.poll(0L);
            Assert.assertTrue((recs.count() == 0 ? 1 : 0) != 0);
            recs = kafkaconsumer.poll(0L);
            Assert.assertTrue((recs.count() == numMsgs ? 1 : 0) != 0);
            kafkaconsumer.unsubscribe();
        }
        kafkaconsumer.subscribe(topics);
        try {
            Thread.sleep(2000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        recs = kafkaconsumer.poll(1L);
        Assert.assertTrue((recs.count() == numMsgs ? 1 : 0) != 0);
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListenerSubsToTopicAfterSubsToTP() throws Exception {
        String sname = PREFIX + "listenersubstotopicaftersubstotp";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        String topic = sname + topicName;
        Properties cprops = new Properties();
        cprops.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        cprops.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        cprops.put("fetch.min.bytes", "1");
        cprops.put("auto.offset.reset", "earliest");
        KafkaConsumer consumer = new KafkaConsumer(cprops);
        ArrayList<TopicPartition> tp = new ArrayList<TopicPartition>();
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        tp.add(topicPartition);
        consumer.assign(tp);
        _logger.info("Consumer is subscribing to the topic partition : " + topicPartition);
        boolean exceptionThrown = false;
        try {
            _logger.info("Consumer is subscribing to the topic : " + topic);
            ArrayList<String> topics = new ArrayList<String>(1);
            topics.add(topic);
            consumer.subscribe(topics);
        }
        catch (Exception e) {
            exceptionThrown = true;
            e.printStackTrace();
        }
        finally {
            Assert.assertTrue((boolean)exceptionThrown);
        }
        consumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerWithNullKeyValue() throws Exception {
        long seekOffset;
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setCompressionAlgo("off");
        String sname = PREFIX + "listenerwithnullkeyvalue";
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 1);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, 0);
        producer.run();
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(sname + topicName);
        kafkaconsumer.subscribe(topics);
        long expectedOffset = seekOffset = 100L;
        int totalNumMsgs = 0;
        Set subscribed = kafkaconsumer.assignment();
        Assert.assertTrue((subscribed.size() == 1 ? 1 : 0) != 0);
        for (TopicPartition p : subscribed) {
            kafkaconsumer.seek(p, seekOffset);
        }
        try {
            Thread.sleep(2000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        block3: while (true) {
            ConsumerRecords recs = kafkaconsumer.poll(1000L);
            totalNumMsgs += recs.count();
            if (recs.count() == 0) {
                Assert.assertTrue((String)("totalNumMsgs " + totalNumMsgs + " numMsgs " + numMsgs + " seekOffset " + seekOffset), ((long)totalNumMsgs == (long)numMsgs - seekOffset + 1L ? 1 : 0) != 0);
                break;
            }
            Iterator iter = recs.iterator();
            while (true) {
                if (!iter.hasNext()) continue block3;
                ConsumerRecord rec = (ConsumerRecord)iter.next();
                Assert.assertTrue((rec.offset() == expectedOffset ? 1 : 0) != 0);
                Assert.assertTrue((rec.key() == null ? 1 : 0) != 0);
                Assert.assertTrue((rec.value() == null ? 1 : 0) != 0);
                ++expectedOffset;
            }
            break;
        }
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        kafkaproducer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerSeqOffset() throws Exception {
        String sname = PREFIX + "listenerseqoffset";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        int numMsgs = 256;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
        SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs);
        producer.run();
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(sname + topicName);
        kafkaconsumer.subscribe(topics);
        try {
            Thread.sleep(2000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        ConsumerRecords recs = kafkaconsumer.poll(0L);
        Assert.assertTrue((recs.count() == 0 ? 1 : 0) != 0);
        recs = kafkaconsumer.poll(0L);
        Assert.assertTrue((recs.count() == numMsgs ? 1 : 0) != 0);
        Iterator iter = recs.iterator();
        long expectedOffset = 1L;
        while (iter.hasNext()) {
            ConsumerRecord rec = (ConsumerRecord)iter.next();
            Assert.assertTrue((rec.offset() == expectedOffset ? 1 : 0) != 0);
            ++expectedOffset;
        }
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerSeqOffsetWithSeek() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setCompressionAlgo("off");
        for (int i = 0; i < 10; ++i) {
            long seekOffset;
            String sname = PREFIX + "listenerseqoffsetwithseek" + i;
            int msgSize = 5;
            if (i > 0) {
                msgSize = i * 512;
            }
            madmin.createStream(sname, sdesc);
            String topicName = ":t";
            Properties props = new Properties();
            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer kafkaproducer = new KafkaProducer(props);
            Properties listenerProps = new Properties();
            listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("fetch.min.bytes", "1");
            listenerProps.put("auto.offset.reset", "earliest");
            KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
            madmin.createTopic(sname, "t", 1);
            int numMsgs = 256;
            ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
            SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, msgSize);
            producer.run();
            ArrayList<String> topics = new ArrayList<String>();
            topics.add(sname + topicName);
            kafkaconsumer.subscribe(topics);
            long expectedOffset = seekOffset = 100L;
            int totalNumMsgs = 0;
            Set subscribed = kafkaconsumer.assignment();
            Assert.assertTrue((subscribed.size() == 1 ? 1 : 0) != 0);
            for (TopicPartition p : subscribed) {
                kafkaconsumer.seek(p, seekOffset);
            }
            try {
                Thread.sleep(2000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            block4: while (true) {
                ConsumerRecords recs = kafkaconsumer.poll(1000L);
                totalNumMsgs += recs.count();
                if (recs.count() == 0) {
                    Assert.assertTrue(((long)totalNumMsgs == (long)numMsgs - seekOffset + 1L ? 1 : 0) != 0);
                    break;
                }
                Iterator iter = recs.iterator();
                while (true) {
                    if (!iter.hasNext()) continue block4;
                    ConsumerRecord rec = (ConsumerRecord)iter.next();
                    Assert.assertTrue((rec.offset() == expectedOffset ? 1 : 0) != 0);
                    Assert.assertTrue((((byte[])rec.key()).length == msgSize ? 1 : 0) != 0);
                    Assert.assertTrue((((byte[])rec.value()).length == msgSize ? 1 : 0) != 0);
                    ++expectedOffset;
                }
                break;
            }
            kafkaconsumer.unsubscribe();
            kafkaconsumer.close();
            kafkaproducer.close();
            madmin.deleteStream(sname);
        }
    }

    @Test
    public void testListenerZeroOffset() throws Exception {
        String sname = PREFIX + "listenerzerooffset";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setCompressionAlgo("off");
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("streams.zerooffset.record.on.eof", "true");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname, "t", 4);
        ArrayList<String> topics = new ArrayList<String>();
        int numZeroOffsetMsgs = 0;
        topics.add(sname + topicName);
        kafkaconsumer.subscribe(topics);
        try {
            Thread.sleep(2000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        numZeroOffsetMsgs = 0;
        ConsumerRecords recs = kafkaconsumer.poll(1000L);
        Assert.assertTrue((recs.count() == 4 ? 1 : 0) != 0);
        for (ConsumerRecord rec : recs) {
            if (rec.offset() != 0L) continue;
            ++numZeroOffsetMsgs;
        }
        Assert.assertTrue((numZeroOffsetMsgs == 4 ? 1 : 0) != 0);
        for (int i = 0; i < 10; ++i) {
            int numMsgs = 28;
            KafkaProducer kafkaproducer = new KafkaProducer(props);
            ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 4);
            SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 4, numMsgs);
            producer.run();
            numZeroOffsetMsgs = 0;
            recs = kafkaconsumer.poll(1000L);
            Assert.assertTrue((recs.count() == numMsgs * 4 ? 1 : 0) != 0);
            for (ConsumerRecord rec : recs) {
                Assert.assertTrue((rec.offset() > 0L ? 1 : 0) != 0);
            }
            try {
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            int recCount = 0;
            while ((recs = kafkaconsumer.poll(1000L)).count() != 0) {
                recCount += recs.count();
                for (ConsumerRecord rec : recs) {
                    if (rec.offset() != 0L) continue;
                    ++numZeroOffsetMsgs;
                }
            }
            Assert.assertTrue((recCount == 4 ? 1 : 0) != 0);
            Assert.assertTrue((numZeroOffsetMsgs == 4 ? 1 : 0) != 0);
        }
        kafkaconsumer.unsubscribe();
        kafkaconsumer.close();
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerWithTabetSplit() throws Exception {
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setCompressionAlgo("off");
        for (int i = 0; i < 1; ++i) {
            ConsumerRecords recs;
            String sname = PREFIX + "listenerwithtabletsplit" + i;
            int msgSize = (i + 1) * 1024 * 1024;
            madmin.createStream(sname, sdesc);
            String topicName = ":t";
            Properties props = new Properties();
            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer kafkaproducer = new KafkaProducer(props);
            Properties listenerProps = new Properties();
            listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("fetch.min.bytes", "1");
            listenerProps.put("auto.offset.reset", "earliest");
            listenerProps.put("max.partition.fetch.bytes", (Object)0xA00000);
            KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
            madmin.createTopic(sname, "t", 1);
            int numMsgs = 1000;
            ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * 1);
            SendMessagesToProducer producer = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, 1, numMsgs, msgSize);
            producer.run();
            ArrayList<String> topics = new ArrayList<String>();
            topics.add(sname + topicName);
            kafkaconsumer.subscribe(topics);
            int totalNumMsgs = 0;
            try {
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            do {
                recs = kafkaconsumer.poll(2000L);
                totalNumMsgs += recs.count();
            } while (recs.count() != 0);
            Assert.assertTrue((totalNumMsgs == numMsgs ? 1 : 0) != 0);
            kafkaconsumer.unsubscribe();
            kafkaconsumer.close();
            kafkaproducer.close();
            madmin.deleteStream(sname);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testListenerSeekAfterPollZero() throws Exception {
        String sname = PREFIX + "testlistenerseekafterpollzero";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        madmin.createTopic(sname, "t", 1);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("group.id", "testlistenerseekafterpollzero");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        String topicName = ":t";
        TopicPartition tp = new TopicPartition(sname + topicName, 0);
        try {
            kafkaconsumer.subscribe(Arrays.asList(sname + topicName));
            kafkaconsumer.poll(0L);
            kafkaconsumer.seek(tp, 1L);
            _logger.debug("Seek after poll with zero timeout succeeded");
        }
        catch (Exception e) {
            System.err.println(e);
            Assert.assertTrue((boolean)false);
        }
        finally {
            kafkaconsumer.unsubscribe();
            kafkaconsumer.close();
            madmin.deleteStream(sname);
        }
    }

    @Test
    public void testListenerWithNullCommitted() throws Exception {
        String sname = PREFIX + "listenerwithnullcommit";
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        TopicPartition p = new TopicPartition(sname + topicName, 0);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("enable.auto.commit", (Object)false);
        listenerProps.put("auto.offset.reset", "earliest");
        listenerProps.put("streams.zerooffset.record.on.eof", "true");
        listenerProps.put("group.id", "testnullcommitted");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        ArrayList<String> topics = new ArrayList<String>();
        topics.add(sname + topicName);
        kafkaconsumer.subscribe(topics);
        try {
            Thread.sleep(2000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Exception ex = null;
        try {
            kafkaconsumer.committed(p);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof UnknownTopicOrPartitionException));
        madmin.createTopic(sname, "t", 1);
        try {
            Thread.sleep(2000L);
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((kafkaconsumer.committed(p) == null ? 1 : 0) != 0);
    }

    public class ConsumeMessages
    implements Runnable {
        private String streamTopicName;
        private int numPartitions;
        private int numMsgsPerPartition;
        private KafkaConsumer consumer;
        private byte[] key;
        private byte[] value;
        private int numPolls;
        private int expectedNumMsgs;
        public boolean hasPassed;
        private boolean seekToEnd;

        public ConsumeMessages(KafkaConsumer c, String topicName, int numparts, int numMessagesPerPartition, int npolls, boolean seekToEnd) {
            this.streamTopicName = topicName;
            this.numPartitions = numparts;
            this.numMsgsPerPartition = numMessagesPerPartition;
            this.consumer = c;
            this.numPolls = npolls;
            this.expectedNumMsgs = this.numMsgsPerPartition * this.numPartitions;
            this.seekToEnd = seekToEnd;
        }

        @Override
        public void run() {
            ConsumerRecords recs;
            int numConsumedMsgs = 0;
            this.hasPassed = false;
            if (this.seekToEnd) {
                System.out.println("Seeking to end");
                ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
                for (int i = 0; i < this.numPartitions; ++i) {
                    partitions.add(new TopicPartition(this.streamTopicName, i));
                }
                this.consumer.assign(partitions);
                Set subscribed = this.consumer.assignment();
                for (TopicPartition p : subscribed) {
                    this.consumer.seekToEnd(new TopicPartition[]{p});
                    System.out.println("Subscribed to " + p.topic() + " partition:" + p.partition() + " position:" + this.consumer.position(p));
                }
            } else {
                ArrayList<String> topics = new ArrayList<String>();
                topics.add(this.streamTopicName);
                this.consumer.subscribe(topics);
            }
            for (int i = 0; i < this.numPolls && (numConsumedMsgs += (recs = this.consumer.poll(1000L)).count()) != this.expectedNumMsgs; ++i) {
            }
            _logger.info("Msgs recived " + numConsumedMsgs + ", expected " + this.expectedNumMsgs);
            this.consumer.close();
            if (numConsumedMsgs != this.expectedNumMsgs) {
                throw new RuntimeException("Num msgs received " + numConsumedMsgs + ", expected " + this.expectedNumMsgs);
            }
            this.hasPassed = true;
        }
    }
}

