package com.mapr.streams.tests.listener;

import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.TopicFeedInfo;
import com.mapr.streams.listener.Listener;
import com.mapr.streams.producer.Producer;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/BasicListenerTest.class */
public class BasicListenerTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(BasicListenerTest.class);
    private static final String PREFIX = "/jtest-" + BasicListenerTest.class.getSimpleName() + "-";
    private static Admin madmin;
    private static final int numParts = 4;

    /* loaded from: input_file:com/mapr/streams/tests/listener/BasicListenerTest$ConsumeMessages.class */
    public class ConsumeMessages implements Runnable {
        private String streamTopicName;
        private int numPartitions;
        private int numMsgsPerPartition;
        private KafkaConsumer consumer;
        private byte[] key;
        private byte[] value;
        private int numPolls;
        private int expectedNumMsgs;
        public boolean hasPassed;
        private boolean seekToEnd;

        public ConsumeMessages(KafkaConsumer kafkaConsumer, String str, int i, int i2, int i3, boolean z) {
            this.streamTopicName = str;
            this.numPartitions = i;
            this.numMsgsPerPartition = i2;
            this.consumer = kafkaConsumer;
            this.numPolls = i3;
            this.expectedNumMsgs = this.numMsgsPerPartition * this.numPartitions;
            this.seekToEnd = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            this.hasPassed = false;
            if (this.seekToEnd) {
                System.out.println("Seeking to end");
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < this.numPartitions; i2++) {
                    arrayList.add(new TopicPartition(this.streamTopicName, i2));
                }
                this.consumer.assign(arrayList);
                for (TopicPartition topicPartition : this.consumer.assignment()) {
                    this.consumer.seekToEnd(new TopicPartition[]{topicPartition});
                    System.out.println("Subscribed to " + topicPartition.topic() + " partition:" + topicPartition.partition() + " position:" + this.consumer.position(topicPartition));
                }
            } else {
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(this.streamTopicName);
                this.consumer.subscribe(arrayList2);
            }
            for (int i3 = 0; i3 < this.numPolls; i3++) {
                i += this.consumer.poll(1000L).count();
                if (i == this.expectedNumMsgs) {
                    break;
                }
            }
            BasicListenerTest._logger.info("Msgs recived " + i + ", expected " + this.expectedNumMsgs);
            if (this.expectedNumMsgs == 0) {
                Iterator it = this.consumer.assignment().iterator();
                while (it.hasNext()) {
                    long position = this.consumer.position((TopicPartition) it.next());
                    if (position != 0) {
                        throw new RuntimeException("Position expected is + " + this.expectedNumMsgs + " received: " + position);
                    }
                }
            }
            this.consumer.close();
            if (i != this.expectedNumMsgs) {
                throw new RuntimeException("Num msgs received " + i + ", expected " + this.expectedNumMsgs);
            }
            this.hasPassed = true;
        }
    }

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        String str = PREFIX + "LGtopicSubscr";
        for (int i = 0; i < 2; i++) {
            try {
                madmin.deleteStream(str + i);
            } catch (Exception e) {
            }
        }
        String str2 = PREFIX + "topicSubscr";
        for (int i2 = 0; i2 < 2; i2++) {
            try {
                madmin.deleteStream(str2 + i2);
            } catch (Exception e2) {
            }
        }
        String str3 = PREFIX + "LGMultiListener";
        for (int i3 = 0; i3 < 2; i3++) {
            try {
                madmin.deleteStream(str3 + i3);
            } catch (Exception e3) {
            }
        }
        try {
            madmin.deleteStream((PREFIX + "singlestream") + 0);
        } catch (Exception e4) {
        }
        String str4 = PREFIX + "multistream";
        for (int i4 = 0; i4 < numParts; i4++) {
            try {
                madmin.deleteStream(str4 + i4);
            } catch (Exception e5) {
            }
        }
        String str5 = PREFIX + "multistreameh";
        for (int i5 = 0; i5 < numParts; i5++) {
            try {
                madmin.deleteStream(str5 + i5);
            } catch (Exception e6) {
            }
        }
        try {
            madmin.deleteStream((PREFIX + "pollfetchsize") + 0);
        } catch (Exception e7) {
        }
        String str6 = PREFIX + "topicSubscrHundred";
        for (int i6 = 0; i6 < 2; i6++) {
            try {
                madmin.deleteStream(str6 + i6);
            } catch (Exception e8) {
            }
        }
        String str7 = PREFIX + "LGtopicSubscrHundred";
        for (int i7 = 0; i7 < 2; i7++) {
            try {
                madmin.deleteStream(str7 + i7);
            } catch (Exception e9) {
            }
        }
        try {
            madmin.deleteStream(PREFIX + "listenerbeforemsgs");
        } catch (Exception e10) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerseektoend");
        } catch (Exception e11) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerfirstpoll");
        } catch (Exception e12) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerseqoffset");
        } catch (Exception e13) {
        }
        for (int i8 = 0; i8 < 10; i8++) {
            try {
                madmin.deleteStream(PREFIX + "listenerseqoffsetwithseek" + i8);
            } catch (Exception e14) {
            }
        }
        try {
            madmin.deleteStream(PREFIX + "listenernegativeoffset");
        } catch (Exception e15) {
        }
        for (int i9 = 0; i9 < 5; i9++) {
            try {
                madmin.deleteStream(PREFIX + "listenerwithtabletsplit" + i9);
            } catch (Exception e16) {
            }
        }
        try {
            madmin.deleteStream(PREFIX + "testlistenerseekafterpollzero");
        } catch (Exception e17) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerwithnullcommit");
        } catch (Exception e18) {
        }
    }

    @Test
    public void testTopicSubscription() throws IOException {
        String str = PREFIX + "topicSubscr";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, newStreamDescriptor);
        }
        Assert.assertTrue(Producer.runTest(str, 2, 2, 0, numParts, 10000, 0 == 0));
        Assert.assertTrue(Listener.runTest(str, 2, 2, 0, numParts, 10000, true, false, null));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testTopicHundredSubscription() throws IOException {
        String str = PREFIX + "topicSubscrHundred";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        System.err.println("creating 2 streams");
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, newStreamDescriptor);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue(Producer.runTest(str, 2, 111, 0, numParts, 1000, 0 == 0));
        System.err.println("starting listener on 111 topics per streams");
        Assert.assertTrue(Listener.runTest(str, 2, 111, 0, numParts, 1000, true, false, null));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testLGTopicSubscription() throws IOException {
        String str = PREFIX + "LGtopicSubscr";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, newStreamDescriptor);
        }
        Assert.assertTrue(Producer.runTest(str, 2, 2, 0, numParts, 1000, 0 == 0));
        Assert.assertTrue(Listener.runTest(str, 2, 2, 0, numParts, 1000, true, false, "LGTopicTest"));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testLGTopicHundredSubscription() throws IOException {
        String str = PREFIX + "LGtopicSubscrHundred";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        System.err.println("creating 2 streams");
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, newStreamDescriptor);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue(Producer.runTest(str, 2, 111, 0, numParts, 1000, 0 == 0));
        System.err.println("starting listener group on 111 topics per streams");
        Assert.assertTrue(Listener.runTest(str, 2, 111, 0, numParts, 1000, true, false, "LGTopicTest"));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testLGMultipleListeners() throws IOException {
        String str = PREFIX + "LGMultiListener";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(7);
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, newStreamDescriptor);
        }
        Assert.assertTrue(Listener.runLGTest(str, 2, 2, 7, true, "LGTest"));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testSingleStream() throws IOException {
        Exception exc;
        String str = PREFIX + "singlestream";
        String str2 = str + 0;
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        madmin.createStream(str2, newStreamDescriptor);
        _logger.info("Populate stream and check listener");
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTest(str, 1, 2, 0, numParts, 10000, "TPGrp"));
        _logger.info("test listener incorrect expected nummsgs");
        try {
            exc = null;
            Listener.runTest(str, 1, 2, 0, numParts, 20000, "TPGrp");
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc instanceof IOException);
        _logger.info("test with topics where message are generated slowly");
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTest(str, 1, 2, 2, numParts, 100000, "TPGrp"));
        madmin.deleteStream(str2);
    }

    @Test
    public void testMultipleStream() throws IOException {
        String str = PREFIX + "multistream";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < numParts; i++) {
            madmin.createStream(str + i, newStreamDescriptor);
        }
        _logger.info("Populate stream and check listener");
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTest(str, numParts, 2, 0, numParts, 10000));
        _logger.info("test with topics where message are generated slowly");
        for (int i2 = 0; i2 < numParts; i2++) {
            madmin.deleteStream(str + i2);
        }
        try {
            Thread.sleep(10000L);
        } catch (Exception e) {
        }
        for (int i3 = 0; i3 < numParts; i3++) {
            madmin.createStream(str + i3, newStreamDescriptor);
        }
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTest(str, numParts, 2, 2, numParts, 100000));
        for (int i4 = 0; i4 < numParts; i4++) {
            madmin.deleteStream(str + i4);
        }
    }

    @Test
    public void testMultipleStreamErrorHandling() throws IOException {
        Exception exc;
        Exception exc2;
        String str = PREFIX + "multistreameh";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < numParts; i++) {
            madmin.createStream(str + i, newStreamDescriptor);
        }
        _logger.info("Populate stream");
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 0, numParts, 100000, false));
        _logger.info("test listener stream delete while polling");
        try {
            exc = null;
            Listener.runTest(str, numParts, 2, 0, numParts, 100000, Listener.TED_ACTION.kDeleteStream);
            System.out.println("Test completed without errrosr !!");
        } catch (Exception e) {
            System.out.println("Got an exception" + e);
            exc = e;
        }
        Assert.assertTrue((exc instanceof NoOffsetForPartitionException) || (exc instanceof IOException));
        for (int i2 = 0; i2 < numParts; i2++) {
            try {
                madmin.deleteStream(str + i2);
            } catch (Exception e2) {
            }
        }
        if (System.getProperty("user.name").equals("root")) {
            return;
        }
        for (int i3 = 0; i3 < numParts; i3++) {
            madmin.createStream(str + i3, newStreamDescriptor);
        }
        _logger.info("Populate stream");
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 0, numParts, 100000, false));
        _logger.info("test listener stream changeperm while polling");
        try {
            exc2 = null;
            Listener.runTest(str, numParts, 2, 0, numParts, 100000, Listener.TED_ACTION.kChangePerm);
        } catch (Exception e3) {
            exc2 = e3;
        }
        Assert.assertTrue(exc2 instanceof NoOffsetForPartitionException);
        Assert.assertTrue(exc2.toString().contains("Permission denied"));
        for (int i4 = 0; i4 < numParts; i4++) {
            try {
                madmin.deleteStream(str + i4);
            } catch (Exception e4) {
            }
        }
    }

    @Test
    public void testPollWithVaryingFetchSize() throws IOException {
        Exception exc;
        String str = PREFIX + "pollfetchsize";
        String str2 = str + 0;
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        _logger.info("Populate stream and check listener");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 1536));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 10240));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 1536));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e3) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 10240));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e4) {
        }
        _logger.info("test with max fetch size lesser than msg size");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        try {
            exc = null;
            Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 108));
            _logger.info("Test completed without errors !! It was expected to throw exception");
        } catch (Exception e5) {
            _logger.info("Hit exception " + e5);
            exc = e5;
        }
        Assert.assertTrue(exc instanceof RecordTooLargeException);
        madmin.deleteStream(str2);
    }

    @Test
    public void testListenerBeforeMsgsAreFlushed() throws Exception {
        String str = PREFIX + "listenerbeforemsgs";
        madmin.createStream(str, Streams.newStreamDescriptor());
        for (int i = 1; i < numParts; i++) {
            String str2 = ":t" + i;
            Properties properties = new Properties();
            properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("streams.buffer.max.time.ms", "10000");
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            Properties properties2 = new Properties();
            properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("fetch.min.bytes", "1");
            properties2.put("auto.offset.reset", "earliest");
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
            SendMessagesToProducer sendMessagesToProducer = new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(100 * 1), str + str2, 1, 100);
            ConsumeMessages consumeMessages = new ConsumeMessages(kafkaConsumer, str + str2, 1, 100, 20, false);
            Thread thread = new Thread(sendMessagesToProducer);
            thread.start();
            try {
                Thread.sleep(2000L);
            } catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            Thread thread2 = new Thread(consumeMessages);
            thread2.start();
            thread.join();
            thread2.join();
            Assert.assertTrue(consumeMessages.hasPassed);
        }
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerWithSeekToEndWithZeroMessage() throws Exception {
        testListenerWithSeekToEnd(0);
    }

    @Test
    public void testListenerWithSeekToEndWithMessage() throws Exception {
        testListenerWithSeekToEnd(10);
    }

    private void testListenerWithSeekToEnd(int i) throws Exception {
        String str = PREFIX + "listenerseektoend";
        Thread thread = null;
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(1);
        madmin.createStream(str, newStreamDescriptor);
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        madmin.createTopic(str, "t", 1);
        SendMessagesToProducer sendMessagesToProducer = null;
        if (i > 0) {
            Properties properties2 = new Properties();
            properties2.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties2.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            sendMessagesToProducer = new SendMessagesToProducer(new KafkaProducer(properties2), new ProducerMultiTest.CountCallback(i * 1), str + ":t", 1, i);
        }
        ConsumeMessages consumeMessages = new ConsumeMessages(kafkaConsumer, str + ":t", 1, i, 20, true);
        if (i > 0) {
            thread = new Thread(sendMessagesToProducer);
        }
        Thread thread2 = new Thread(consumeMessages);
        thread2.start();
        try {
            Thread.sleep(2000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        if (i > 0) {
            thread.start();
            thread.join();
        }
        thread2.join();
        Iterator it = madmin.infoTopic(str + ":t").iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((TopicFeedInfo) it.next()).stat().getMaxSeq() == ((long) (i - 1)));
        }
        Assert.assertTrue(consumeMessages.hasPassed);
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerFirstPoll() throws Exception {
        String str = PREFIX + "listenerfirstpoll";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        madmin.createTopic(str, "t", numParts);
        new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(10 * 1), str + ":t", 1, 10).run();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str + ":t");
        for (int i = 0; i < 3; i++) {
            kafkaConsumer.subscribe(arrayList);
            try {
                Thread.sleep(2000L);
            } catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            Assert.assertTrue(kafkaConsumer.poll(0L).count() == 0);
            Assert.assertTrue(kafkaConsumer.poll(0L).count() == 10);
            kafkaConsumer.unsubscribe();
        }
        kafkaConsumer.subscribe(arrayList);
        try {
            Thread.sleep(2000L);
        } catch (Exception e2) {
            System.out.println("Sleep interrupted " + e2);
        }
        Assert.assertTrue(kafkaConsumer.poll(1L).count() == 10);
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerSubsToTopicAfterSubsToTP() throws Exception {
        String str = PREFIX + "listenersubstotopicaftersubstotp";
        madmin.createStream(str, Streams.newStreamDescriptor());
        String str2 = str + ":t";
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        ArrayList arrayList = new ArrayList();
        TopicPartition topicPartition = new TopicPartition(str2, 0);
        arrayList.add(topicPartition);
        kafkaConsumer.assign(arrayList);
        _logger.info("Consumer is subscribing to the topic partition : " + topicPartition);
        boolean z = false;
        try {
            try {
                _logger.info("Consumer is subscribing to the topic : " + str2);
                ArrayList arrayList2 = new ArrayList(1);
                arrayList2.add(str2);
                kafkaConsumer.subscribe(arrayList2);
                Assert.assertTrue(false);
            } catch (Exception e) {
                z = true;
                e.printStackTrace();
                Assert.assertTrue(true);
            }
            kafkaConsumer.close();
            madmin.deleteStream(str);
        } catch (Throwable th) {
            Assert.assertTrue(z);
            throw th;
        }
    }

    @Test
    public void testListenerWithNullKeyValue() throws Exception {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setCompressionAlgo("off");
        String str = PREFIX + "listenerwithnullkeyvalue";
        madmin.createStream(str, newStreamDescriptor);
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        madmin.createTopic(str, "t", 1);
        new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 0).run();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str + ":t");
        kafkaConsumer.subscribe(arrayList);
        long j = 100;
        int i = 0;
        Set assignment = kafkaConsumer.assignment();
        Assert.assertTrue(assignment.size() == 1);
        Iterator it = assignment.iterator();
        while (it.hasNext()) {
            kafkaConsumer.seek((TopicPartition) it.next(), 100L);
        }
        try {
            Thread.sleep(2000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        while (true) {
            ConsumerRecords poll = kafkaConsumer.poll(1000L);
            i += poll.count();
            if (poll.count() == 0) {
                break;
            }
            Iterator it2 = poll.iterator();
            while (it2.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                Assert.assertTrue(consumerRecord.offset() == j);
                Assert.assertTrue(consumerRecord.key() == null);
                Assert.assertTrue(consumerRecord.value() == null);
                j++;
            }
        }
        Assert.assertTrue("totalNumMsgs " + i + " numMsgs 256 seekOffset 100", ((long) i) == ((long) 256) - 100);
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerSeqOffset() throws Exception {
        String str = PREFIX + "listenerseqoffset";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        madmin.createTopic(str, "t", numParts);
        new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256).run();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str + ":t");
        kafkaConsumer.subscribe(arrayList);
        try {
            Thread.sleep(2000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        Assert.assertTrue(kafkaConsumer.poll(0L).count() == 0);
        ConsumerRecords poll = kafkaConsumer.poll(0L);
        Assert.assertTrue(poll.count() == 256);
        Iterator it = poll.iterator();
        long j = 0;
        while (true) {
            long j2 = j;
            if (!it.hasNext()) {
                kafkaConsumer.unsubscribe();
                kafkaConsumer.close();
                madmin.deleteStream(str);
                return;
            }
            Assert.assertTrue(((ConsumerRecord) it.next()).offset() == j2);
            j = j2 + 1;
        }
    }

    @Test
    public void testListenerSeqOffsetWithSeek() throws Exception {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setCompressionAlgo("off");
        for (int i = 0; i < 10; i++) {
            String str = PREFIX + "listenerseqoffsetwithseek" + i;
            int i2 = i > 0 ? i * 512 : 5;
            madmin.createStream(str, newStreamDescriptor);
            Properties properties = new Properties();
            properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            Properties properties2 = new Properties();
            properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("fetch.min.bytes", "1");
            properties2.put("auto.offset.reset", "earliest");
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
            madmin.createTopic(str, "t", 1);
            new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, i2).run();
            ArrayList arrayList = new ArrayList();
            arrayList.add(str + ":t");
            kafkaConsumer.subscribe(arrayList);
            long j = 100;
            int i3 = 0;
            Set assignment = kafkaConsumer.assignment();
            Assert.assertTrue(assignment.size() == 1);
            Iterator it = assignment.iterator();
            while (it.hasNext()) {
                kafkaConsumer.seek((TopicPartition) it.next(), 100L);
            }
            try {
                Thread.sleep(2000L);
            } catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            while (true) {
                ConsumerRecords poll = kafkaConsumer.poll(1000L);
                i3 += poll.count();
                if (poll.count() == 0) {
                    break;
                }
                Iterator it2 = poll.iterator();
                while (it2.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it2.next();
                    Assert.assertTrue(consumerRecord.offset() == j);
                    Assert.assertTrue(((byte[]) consumerRecord.key()).length == i2);
                    Assert.assertTrue(((byte[]) consumerRecord.value()).length == i2);
                    j++;
                }
            }
            Assert.assertTrue(((long) i3) == ((long) 256) - 100);
            kafkaConsumer.unsubscribe();
            kafkaConsumer.close();
            kafkaProducer.close();
            madmin.deleteStream(str);
        }
    }

    @Test
    public void testListenerNegativeOffset() throws Exception {
        String str = PREFIX + "listenernegativeoffset";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setCompressionAlgo("off");
        madmin.createStream(str, newStreamDescriptor);
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("streams.negativeoffset.record.on.eof", "true");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        madmin.createTopic(str, "t", numParts);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str + ":t");
        kafkaConsumer.subscribe(arrayList);
        try {
            Thread.sleep(2000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        int i = 0;
        ConsumerRecords poll = kafkaConsumer.poll(1000L);
        Assert.assertTrue(poll.count() == numParts);
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            if (((ConsumerRecord) it.next()).offset() == -1001) {
                i++;
            }
        }
        Assert.assertTrue(i == numParts);
        for (int i2 = 0; i2 < 10; i2++) {
            new SendMessagesToProducer(new KafkaProducer(properties), new ProducerMultiTest.CountCallback(28 * numParts), str + ":t", numParts, 28).run();
            int i3 = 0;
            ConsumerRecords poll2 = kafkaConsumer.poll(1000L);
            Assert.assertTrue(poll2.count() == 28 * numParts);
            Iterator it2 = poll2.iterator();
            while (it2.hasNext()) {
                Assert.assertTrue(((ConsumerRecord) it2.next()).offset() >= 0);
            }
            try {
                Thread.sleep(1000L);
            } catch (Exception e2) {
                System.out.println("Sleep interrupted " + e2);
            }
            int i4 = 0;
            while (true) {
                ConsumerRecords poll3 = kafkaConsumer.poll(1000L);
                if (poll3.count() == 0) {
                    break;
                }
                i4 += poll3.count();
                Iterator it3 = poll3.iterator();
                while (it3.hasNext()) {
                    if (((ConsumerRecord) it3.next()).offset() == -1001) {
                        i3++;
                    }
                }
            }
            Assert.assertTrue(i4 == numParts);
            Assert.assertTrue(i3 == numParts);
        }
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerWithTabletSplit() throws Exception {
        ConsumerRecords poll;
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setCompressionAlgo("off");
        for (int i = 0; i < 1; i++) {
            String str = PREFIX + "listenerwithtabletsplit" + i;
            madmin.createStream(str, newStreamDescriptor);
            Properties properties = new Properties();
            properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            Properties properties2 = new Properties();
            properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("fetch.min.bytes", "1");
            properties2.put("auto.offset.reset", "earliest");
            properties2.put("max.partition.fetch.bytes", 10485760);
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
            madmin.createTopic(str, "t", 1);
            new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(1000 * 1), str + ":t", 1, 1000, (i + 1) * 1024 * 1024).run();
            ArrayList arrayList = new ArrayList();
            arrayList.add(str + ":t");
            kafkaConsumer.subscribe(arrayList);
            int i2 = 0;
            try {
                Thread.sleep(5000L);
            } catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            do {
                poll = kafkaConsumer.poll(2000L);
                i2 += poll.count();
            } while (poll.count() != 0);
            Assert.assertTrue(i2 == 1000);
            kafkaConsumer.unsubscribe();
            kafkaConsumer.close();
            kafkaProducer.close();
            madmin.deleteStream(str);
        }
    }

    @Test
    public void testListenerSeekAfterPollZero() throws Exception {
        String str = PREFIX + "testlistenerseekafterpollzero";
        madmin.createStream(str, Streams.newStreamDescriptor());
        madmin.createTopic(str, "t", 1);
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("group.id", "testlistenerseekafterpollzero");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        try {
            try {
                kafkaConsumer.subscribe(Arrays.asList(str + ":t"));
                kafkaConsumer.poll(0L);
                kafkaConsumer.seek(topicPartition, 1L);
                _logger.debug("Seek after poll with zero timeout succeeded");
                kafkaConsumer.unsubscribe();
                kafkaConsumer.close();
                madmin.deleteStream(str);
            } catch (Exception e) {
                System.err.println(e);
                Assert.assertTrue(false);
                kafkaConsumer.unsubscribe();
                kafkaConsumer.close();
                madmin.deleteStream(str);
            }
        } catch (Throwable th) {
            kafkaConsumer.unsubscribe();
            kafkaConsumer.close();
            madmin.deleteStream(str);
            throw th;
        }
    }

    @Test
    public void testListenerWithNullCommitted() throws Exception {
        String str = PREFIX + "listenerwithnullcommit";
        madmin.createStream(str, Streams.newStreamDescriptor());
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("enable.auto.commit", false);
        properties.put("auto.offset.reset", "earliest");
        properties.put("streams.negativeoffset.record.on.eof", "true");
        properties.put("group.id", "testnullcommitted");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str + ":t");
        kafkaConsumer.subscribe(arrayList);
        try {
            Thread.sleep(2000L);
        } catch (Exception e) {
        }
        Exception exc = null;
        try {
            kafkaConsumer.committed(topicPartition);
        } catch (Exception e2) {
            exc = e2;
        }
        Assert.assertTrue(exc instanceof UnknownTopicOrPartitionException);
        madmin.createTopic(str, "t", 1);
        try {
            Thread.sleep(2000L);
        } catch (Exception e3) {
        }
        Assert.assertTrue(kafkaConsumer.committed(topicPartition) == null);
    }

    @Test
    public void testListenerBug24483() throws Exception {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("streams.consumer.default.stream", "/streams");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.offset.reset", "latest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        try {
            Runtime.getRuntime().exec("maprcli stream create -path /streams").waitFor();
            Runtime.getRuntime().exec("maprcli stream topic create -path /streams -topic topic1").waitFor();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        Map listTopics = kafkaConsumer.listTopics();
        try {
            Runtime.getRuntime().exec("maprcli stream delete -path /streams").waitFor();
            Runtime.getRuntime().exec("maprcli stream create -path /streams").waitFor();
            Runtime.getRuntime().exec("maprcli stream topic create -path /streams -topic topic1").waitFor();
        } catch (IOException e3) {
            e3.printStackTrace();
        } catch (InterruptedException e4) {
            e4.printStackTrace();
        }
        Map listTopics2 = kafkaConsumer.listTopics();
        Assert.assertTrue(listTopics.size() == listTopics2.size());
        Assert.assertTrue(listTopics2.size() == kafkaConsumer.listTopics().size());
    }
}
