package com.mapr.streams.tests.listener;

import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.streams.listener.Listener;
import com.mapr.streams.producer.Producer;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/BasicListenerTest.class */
public class BasicListenerTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(BasicListenerTest.class);
    private static final String PREFIX = "/jtest-" + BasicListenerTest.class.getSimpleName() + "-";
    private static MarlinAdmin madmin;
    private static final int numParts = 4;

    /* loaded from: input_file:com/mapr/streams/tests/listener/BasicListenerTest$ConsumeMessages.class */
    public class ConsumeMessages implements Runnable {
        private String streamTopicName;
        private int numPartitions;
        private int numMsgsPerPartition;
        private KafkaConsumer consumer;
        private byte[] key;
        private byte[] value;
        private int numPolls;
        private int expectedNumMsgs;
        public boolean hasPassed;
        private boolean seekToEnd;

        public ConsumeMessages(KafkaConsumer kafkaConsumer, String str, int i, int i2, int i3, boolean z) {
            this.streamTopicName = str;
            this.numPartitions = i;
            this.numMsgsPerPartition = i2;
            this.consumer = kafkaConsumer;
            this.numPolls = i3;
            this.expectedNumMsgs = this.numMsgsPerPartition * this.numPartitions;
            this.seekToEnd = z;
        }

        @Override // java.lang.Runnable
        public void run() {
            int i = 0;
            this.hasPassed = false;
            if (this.seekToEnd) {
                System.out.println("Seeking to end");
                ArrayList arrayList = new ArrayList();
                for (int i2 = 0; i2 < this.numPartitions; i2++) {
                    arrayList.add(new TopicPartition(this.streamTopicName, i2));
                }
                this.consumer.assign(arrayList);
                for (TopicPartition topicPartition : this.consumer.assignment()) {
                    this.consumer.seekToEnd(new TopicPartition[]{topicPartition});
                    System.out.println("Subscribed to " + topicPartition.topic() + " partition:" + topicPartition.partition() + " position:" + this.consumer.position(topicPartition));
                }
            } else {
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(this.streamTopicName);
                this.consumer.subscribe(arrayList2);
            }
            for (int i3 = 0; i3 < this.numPolls; i3++) {
                i += this.consumer.poll(1000L).count();
                if (i == this.expectedNumMsgs) {
                    break;
                }
            }
            BasicListenerTest._logger.info("Msgs recived " + i + ", expected " + this.expectedNumMsgs);
            this.consumer.close();
            if (i != this.expectedNumMsgs) {
                throw new RuntimeException("Num msgs received " + i + ", expected " + this.expectedNumMsgs);
            }
            this.hasPassed = true;
        }
    }

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = new MarlinAdmin(new Configuration());
        String str = PREFIX + "LGtopicSubscr";
        for (int i = 0; i < 2; i++) {
            try {
                madmin.deleteStream(str + i);
            } catch (Exception e) {
            }
        }
        String str2 = PREFIX + "topicSubscr";
        for (int i2 = 0; i2 < 2; i2++) {
            try {
                madmin.deleteStream(str2 + i2);
            } catch (Exception e2) {
            }
        }
        String str3 = PREFIX + "LGMultiListener";
        for (int i3 = 0; i3 < 2; i3++) {
            try {
                madmin.deleteStream(str3 + i3);
            } catch (Exception e3) {
            }
        }
        try {
            madmin.deleteStream((PREFIX + "singlestream") + 0);
        } catch (Exception e4) {
        }
        String str4 = PREFIX + "multistream";
        for (int i4 = 0; i4 < numParts; i4++) {
            try {
                madmin.deleteStream(str4 + i4);
            } catch (Exception e5) {
            }
        }
        String str5 = PREFIX + "multistreameh";
        for (int i5 = 0; i5 < numParts; i5++) {
            try {
                madmin.deleteStream(str5 + i5);
            } catch (Exception e6) {
            }
        }
        try {
            madmin.deleteStream((PREFIX + "pollfetchsize") + 0);
        } catch (Exception e7) {
        }
        String str6 = PREFIX + "topicSubscrHundred";
        for (int i6 = 0; i6 < 2; i6++) {
            try {
                madmin.deleteStream(str6 + i6);
            } catch (Exception e8) {
            }
        }
        String str7 = PREFIX + "LGtopicSubscrHundred";
        for (int i7 = 0; i7 < 2; i7++) {
            try {
                madmin.deleteStream(str7 + i7);
            } catch (Exception e9) {
            }
        }
        try {
            madmin.deleteStream(PREFIX + "listenerbeforemsgs");
        } catch (Exception e10) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerseektoend");
        } catch (Exception e11) {
        }
    }

    @Test
    public void testTopicSubscription() throws IOException {
        String str = PREFIX + "topicSubscr";
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, mStreamDescriptor);
        }
        Assert.assertTrue(Producer.runTest(str, 2, 2, 0, numParts, 10000, 0 == 0));
        Assert.assertTrue(Listener.runTest(str, 2, 2, 0, numParts, 10000, true, false, null));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testTopicHundredSubscription() throws IOException {
        String str = PREFIX + "topicSubscrHundred";
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        System.err.println("creating 2 streams");
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, mStreamDescriptor);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue(Producer.runTest(str, 2, 111, 0, numParts, 1000, 0 == 0));
        System.err.println("starting listener on 111 topics per streams");
        Assert.assertTrue(Listener.runTest(str, 2, 111, 0, numParts, 1000, true, false, null));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testLGTopicSubscription() throws IOException {
        String str = PREFIX + "LGtopicSubscr";
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, mStreamDescriptor);
        }
        Assert.assertTrue(Producer.runTest(str, 2, 2, 0, numParts, 1000, 0 == 0));
        Assert.assertTrue(Listener.runTest(str, 2, 2, 0, numParts, 1000, true, false, "LGTopicTest"));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testLGTopicHundredSubscription() throws IOException {
        String str = PREFIX + "LGtopicSubscrHundred";
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        System.err.println("creating 2 streams");
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, mStreamDescriptor);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue(Producer.runTest(str, 2, 111, 0, numParts, 1000, 0 == 0));
        System.err.println("starting listener group on 111 topics per streams");
        Assert.assertTrue(Listener.runTest(str, 2, 111, 0, numParts, 1000, true, false, "LGTopicTest"));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testLGMultipleListeners() throws IOException {
        String str = PREFIX + "LGMultiListener";
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(7);
        for (int i = 0; i < 2; i++) {
            madmin.createStream(str + i, mStreamDescriptor);
        }
        Assert.assertTrue(Listener.runLGTest(str, 2, 2, 7, true, "LGTest"));
        for (int i2 = 0; i2 < 2; i2++) {
            madmin.deleteStream(str + i2);
        }
    }

    @Test
    public void testSingleStream() throws IOException {
        Exception exc;
        String str = PREFIX + "singlestream";
        String str2 = str + 0;
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        madmin.createStream(str2, mStreamDescriptor);
        _logger.info("Populate stream and check listener");
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTest(str, 1, 2, 0, numParts, 10000, "TPGrp"));
        _logger.info("test listener incorrect expected nummsgs");
        try {
            exc = null;
            Listener.runTest(str, 1, 2, 0, numParts, 20000, "TPGrp");
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc instanceof IOException);
        _logger.info("test with topics where message are generated slowly");
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
        madmin.createStream(str2, mStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTest(str, 1, 2, 2, numParts, 100000, "TPGrp"));
        madmin.deleteStream(str2);
    }

    @Test
    public void testMultipleStream() throws IOException {
        String str = PREFIX + "multistream";
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < numParts; i++) {
            madmin.createStream(str + i, mStreamDescriptor);
        }
        _logger.info("Populate stream and check listener");
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTest(str, numParts, 2, 0, numParts, 10000));
        _logger.info("test with topics where message are generated slowly");
        for (int i2 = 0; i2 < numParts; i2++) {
            madmin.deleteStream(str + i2);
        }
        try {
            Thread.sleep(10000L);
        } catch (Exception e) {
        }
        for (int i3 = 0; i3 < numParts; i3++) {
            madmin.createStream(str + i3, mStreamDescriptor);
        }
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTest(str, numParts, 2, 2, numParts, 100000));
        for (int i4 = 0; i4 < numParts; i4++) {
            madmin.deleteStream(str + i4);
        }
    }

    @Test
    public void testMultipleStreamErrorHandling() throws IOException {
        Exception exc;
        Exception exc2;
        String str = PREFIX + "multistreameh";
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        for (int i = 0; i < numParts; i++) {
            madmin.createStream(str + i, mStreamDescriptor);
        }
        _logger.info("Populate stream");
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 0, numParts, 100000, false));
        _logger.info("test listener stream delete while polling");
        try {
            exc = null;
            Listener.runTest(str, numParts, 2, 0, numParts, 100000, Listener.TED_ACTION.kDeleteStream);
            System.out.println("Test completed without errrosr !!");
        } catch (Exception e) {
            System.out.println("Got an exception" + e);
            exc = e;
        }
        Assert.assertTrue((exc instanceof NoOffsetForPartitionException) || (exc instanceof IOException));
        for (int i2 = 0; i2 < numParts; i2++) {
            try {
                madmin.deleteStream(str + i2);
            } catch (Exception e2) {
            }
        }
        if (System.getProperty("user.name").equals("root")) {
            return;
        }
        for (int i3 = 0; i3 < numParts; i3++) {
            madmin.createStream(str + i3, mStreamDescriptor);
        }
        _logger.info("Populate stream");
        Assert.assertTrue(Producer.runTest(str, numParts, 2, 0, numParts, 100000, false));
        _logger.info("test listener stream changeperm while polling");
        try {
            exc2 = null;
            Listener.runTest(str, numParts, 2, 0, numParts, 100000, Listener.TED_ACTION.kChangePerm);
        } catch (Exception e3) {
            exc2 = e3;
        }
        Assert.assertTrue(exc2 instanceof NoOffsetForPartitionException);
        Assert.assertTrue(exc2.toString().contains("Permission denied"));
        for (int i4 = 0; i4 < numParts; i4++) {
            try {
                madmin.deleteStream(str + i4);
            } catch (Exception e4) {
            }
        }
    }

    @Test
    public void testPollWithVaryingFetchSize() throws IOException {
        Exception exc;
        String str = PREFIX + "pollfetchsize";
        String str2 = str + 0;
        MStreamDescriptor mStreamDescriptor = new MStreamDescriptor();
        mStreamDescriptor.setDefaultPartitions(numParts);
        _logger.info("Populate stream and check listener");
        madmin.createStream(str2, mStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 1024));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e) {
        }
        madmin.createStream(str2, mStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 10240));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(str2, mStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 1024));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e3) {
        }
        madmin.createStream(str2, mStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 10240));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e4) {
        }
        _logger.info("test with max fetch size lesser than msg size");
        madmin.createStream(str2, mStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        try {
            exc = null;
            Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 108));
            _logger.info("Test completed without errors !! It was expected to throw exception");
        } catch (Exception e5) {
            _logger.info("Hit exception " + e5);
            exc = e5;
        }
        Assert.assertTrue(exc instanceof RecordTooLargeException);
        madmin.deleteStream(str2);
    }

    @Test
    public void testListenerBeforeMsgsAreFlushed() throws Exception {
        String str = PREFIX + "listenerbeforemsgs";
        madmin.createStream(str, new MStreamDescriptor());
        for (int i = 1; i < numParts; i++) {
            String str2 = ":t" + i;
            Properties properties = new Properties();
            properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            properties.put("streams.buffer.max.time.ms", "10000");
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            Properties properties2 = new Properties();
            properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties2.put("fetch.min.bytes", "1");
            properties2.put("auto.offset.reset", "earliest");
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
            SendMessagesToProducer sendMessagesToProducer = new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(100 * 1), str + str2, 1, 100);
            ConsumeMessages consumeMessages = new ConsumeMessages(kafkaConsumer, str + str2, 1, 100, 20, false);
            Thread thread = new Thread(sendMessagesToProducer);
            thread.start();
            try {
                Thread.sleep(2000L);
            } catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            Thread thread2 = new Thread(consumeMessages);
            thread2.start();
            thread.join();
            thread2.join();
            Assert.assertTrue(consumeMessages.hasPassed);
        }
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerWithSeekToEnd() throws Exception {
        String str = PREFIX + "listenerseektoend";
        madmin.createStream(str, new MStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        madmin.createTopic(str + ":t", numParts);
        SendMessagesToProducer sendMessagesToProducer = new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(10 * 1), str + ":t", 1, 10);
        ConsumeMessages consumeMessages = new ConsumeMessages(kafkaConsumer, str + ":t", 1, 10, 20, true);
        Thread thread = new Thread(sendMessagesToProducer);
        Thread thread2 = new Thread(consumeMessages);
        thread2.start();
        try {
            Thread.sleep(2000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        thread.start();
        thread.join();
        thread2.join();
        Assert.assertTrue(consumeMessages.hasPassed);
        madmin.deleteStream(str);
    }
}
