/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.streams.listener.Listener;
import com.mapr.streams.producer.Producer;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class BasicListenerTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(BasicListenerTest.class);
    private static final String PREFIX = "/jtest-" + BasicListenerTest.class.getSimpleName() + "-";
    private static MarlinAdmin madmin;
    private static final int numParts = 4;

    @BeforeClass
    public static void setupTest() throws Exception {
        int i;
        Configuration conf = new Configuration();
        madmin = new MarlinAdmin(conf);
        String sname = PREFIX + "LGtopicSubscr";
        int numStreams = 2;
        for (int i2 = 0; i2 < numStreams; ++i2) {
            try {
                madmin.deleteStream(sname + i2);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        String sname1 = PREFIX + "topicSubscr";
        numStreams = 2;
        for (int i3 = 0; i3 < numStreams; ++i3) {
            try {
                madmin.deleteStream(sname1 + i3);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        String sname2 = PREFIX + "LGMultiListener";
        numStreams = 2;
        for (int i4 = 0; i4 < numStreams; ++i4) {
            try {
                madmin.deleteStream(sname2 + i4);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        sname = PREFIX + "singlestream";
        String snameFull = sname + 0;
        try {
            madmin.deleteStream(snameFull);
        }
        catch (Exception e) {
            // empty catch block
        }
        sname = PREFIX + "multistream";
        numStreams = 4;
        for (i = 0; i < numStreams; ++i) {
            try {
                madmin.deleteStream(sname + i);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        sname = PREFIX + "multistreameh";
        numStreams = 4;
        for (i = 0; i < numStreams; ++i) {
            try {
                madmin.deleteStream(sname + i);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        sname = PREFIX + "pollfetchsize";
        try {
            madmin.deleteStream(sname + 0);
        }
        catch (Exception e) {
            // empty catch block
        }
        sname = PREFIX + "topicSubscrHundred";
        try {
            numStreams = 2;
            for (int i5 = 0; i5 < numStreams; ++i5) {
                madmin.deleteStream(sname + i5);
            }
        }
        catch (Exception e) {
            // empty catch block
        }
        sname = PREFIX + "LGtopicSubscrHundred";
        try {
            numStreams = 2;
            for (int i6 = 0; i6 < numStreams; ++i6) {
                madmin.deleteStream(sname + i6);
            }
        }
        catch (Exception e) {
            // empty catch block
        }
        sname = PREFIX + "listenerbeforemsgs";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception e) {
            // empty catch block
        }
        sname = PREFIX + "listenerseektoend";
        try {
            madmin.deleteStream(sname);
        }
        catch (Exception e) {
            // empty catch block
        }
    }

    @Test
    public void testTopicSubscription() throws IOException {
        int i;
        String sname = PREFIX + "topicSubscr";
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 10000, !messagesOrdered));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 0, 4, 10000, true, messagesOrdered, null));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testTopicHundredSubscription() throws IOException {
        int i;
        String sname = PREFIX + "topicSubscrHundred";
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        System.err.println("creating " + numStreams + " streams");
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 111, 0, 4, 1000, !messagesOrdered));
        System.err.println("starting listener on 111 topics per streams");
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 111, 0, 4, 1000, true, messagesOrdered, null));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testLGTopicSubscription() throws IOException {
        int i;
        String sname = PREFIX + "LGtopicSubscr";
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 1000, !messagesOrdered));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 0, 4, 1000, true, messagesOrdered, "LGTopicTest"));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testLGTopicHundredSubscription() throws IOException {
        int i;
        String sname = PREFIX + "LGtopicSubscrHundred";
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        System.err.println("creating " + numStreams + " streams");
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        System.err.println("starting producer on 111 topics per streams");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 111, 0, 4, 1000, !messagesOrdered));
        System.err.println("starting listener group on 111 topics per streams");
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 111, 0, 4, 1000, true, messagesOrdered, "LGTopicTest"));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testLGMultipleListeners() throws IOException {
        int i;
        String sname = PREFIX + "LGMultiListener";
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(7);
        Object ex = null;
        int numStreams = 2;
        boolean messagesOrdered = false;
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Listener.runLGTest(sname, numStreams, 2, 7, true, "LGTest"));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testSingleStream() throws IOException {
        String sname = PREFIX + "singlestream";
        String snameFull = sname + 0;
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Exception ex = null;
        madmin.createStream(snameFull, sdesc);
        _logger.info("Populate stream and check listener");
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, 1, 2, 0, 4, 10000, "TPGrp"));
        _logger.info("test listener incorrect expected nummsgs");
        try {
            ex = null;
            Listener.runTest(sname, 1, 2, 0, 4, 20000, "TPGrp");
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof IOException));
        _logger.info("test with topics where message are generated slowly");
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception e) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, 1, 2, 2, 4, 100000, "TPGrp"));
        madmin.deleteStream(snameFull);
    }

    @Test
    public void testMultipleStream() throws IOException {
        int i;
        int i2;
        String sname = PREFIX + "multistream";
        int numStreams = 4;
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Object ex = null;
        for (i2 = 0; i2 < numStreams; ++i2) {
            madmin.createStream(sname + i2, sdesc);
        }
        _logger.info("Populate stream and check listener");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 0, 4, 10000));
        _logger.info("test with topics where message are generated slowly");
        for (i2 = 0; i2 < numStreams; ++i2) {
            madmin.deleteStream(sname + i2);
        }
        try {
            Thread.sleep(10000L);
        }
        catch (Exception e) {
            // empty catch block
        }
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTest(sname, numStreams, 2, 2, 4, 100000));
        for (i = 0; i < numStreams; ++i) {
            madmin.deleteStream(sname + i);
        }
    }

    @Test
    public void testMultipleStreamErrorHandling() throws IOException {
        int i;
        String sname = PREFIX + "multistreameh";
        int numStreams = 4;
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Exception ex = null;
        for (int i2 = 0; i2 < numStreams; ++i2) {
            madmin.createStream(sname + i2, sdesc);
        }
        _logger.info("Populate stream");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 100000, false));
        _logger.info("test listener stream delete while polling");
        try {
            ex = null;
            Listener.runTest(sname, numStreams, 2, 0, 4, 100000, Listener.TED_ACTION.kDeleteStream);
            System.out.println("Test completed without errrosr !!");
        }
        catch (Exception e) {
            System.out.println("Got an exception" + e);
            ex = e;
        }
        Assert.assertTrue((ex instanceof NoOffsetForPartitionException || ex instanceof IOException ? 1 : 0) != 0);
        for (i = 0; i < numStreams; ++i) {
            try {
                madmin.deleteStream(sname + i);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        ex = null;
        if (System.getProperty("user.name").equals("root")) {
            return;
        }
        for (i = 0; i < numStreams; ++i) {
            madmin.createStream(sname + i, sdesc);
        }
        _logger.info("Populate stream");
        Assert.assertTrue((boolean)Producer.runTest(sname, numStreams, 2, 0, 4, 100000, false));
        _logger.info("test listener stream changeperm while polling");
        try {
            ex = null;
            Listener.runTest(sname, numStreams, 2, 0, 4, 100000, Listener.TED_ACTION.kChangePerm);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof NoOffsetForPartitionException));
        Assert.assertTrue((boolean)ex.toString().contains("Permission denied"));
        for (int i3 = 0; i3 < numStreams; ++i3) {
            try {
                madmin.deleteStream(sname + i3);
                continue;
            }
            catch (Exception e) {
                // empty catch block
            }
        }
    }

    @Test
    public void testPollWithVaryingFetchSize() throws IOException {
        String sname = PREFIX + "pollfetchsize";
        String snameFull = sname + 0;
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(4);
        Exception ex = null;
        _logger.info("Populate stream and check listener");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 1024));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception e) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 0, 4, 10000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 0, 4, 10000, 10240));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception e) {
            // empty catch block
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 1024));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception e) {
            // empty catch block
        }
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 10240));
        madmin.deleteStream(snameFull);
        try {
            Thread.sleep(10000L);
        }
        catch (Exception e) {
            // empty catch block
        }
        _logger.info("test with max fetch size lesser than msg size");
        madmin.createStream(snameFull, sdesc);
        Assert.assertTrue((boolean)Producer.runTest(sname, 1, 2, 2, 4, 100000, false));
        try {
            ex = null;
            Assert.assertTrue((boolean)Listener.runTestWithPollOptions(sname, 1, 2, 2, 4, 100000, 108));
            _logger.info("Test completed without errors !! It was expected to throw exception");
        }
        catch (Exception e) {
            _logger.info("Hit exception " + e);
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof RecordTooLargeException));
        madmin.deleteStream(snameFull);
    }

    @Test
    public void testListenerBeforeMsgsAreFlushed() throws Exception {
        String sname = PREFIX + "listenerbeforemsgs";
        MStreamDescriptor sdesc = new MStreamDescriptor();
        madmin.createStream(sname, sdesc);
        for (int i = 1; i < 4; ++i) {
            String topicName = ":t" + i;
            Properties props = new Properties();
            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            props.put("streams.buffer.max.time.ms", "10000");
            KafkaProducer kafkaproducer = new KafkaProducer(props);
            Properties listenerProps = new Properties();
            listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            listenerProps.put("fetch.min.bytes", "1");
            listenerProps.put("auto.offset.reset", "earliest");
            KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
            int numParts = 1;
            int numMsgs = 100;
            ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
            SendMessagesToProducer worker = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, numParts, numMsgs);
            ConsumeMessages listenWorker = new ConsumeMessages(kafkaconsumer, sname + topicName, numParts, numMsgs, 20, false);
            Thread producer = new Thread(worker);
            producer.start();
            try {
                Thread.sleep(2000L);
            }
            catch (Exception e) {
                System.out.println("Sleep interrupted " + e);
            }
            Thread consumer = new Thread(listenWorker);
            consumer.start();
            producer.join();
            consumer.join();
            Assert.assertTrue((boolean)listenWorker.hasPassed);
        }
        madmin.deleteStream(sname);
    }

    @Test
    public void testListenerWithSeekToEnd() throws Exception {
        String sname = PREFIX + "listenerseektoend";
        MStreamDescriptor sdesc = new MStreamDescriptor();
        madmin.createStream(sname, sdesc);
        String topicName = ":t";
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaproducer = new KafkaProducer(props);
        Properties listenerProps = new Properties();
        listenerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        listenerProps.put("fetch.min.bytes", "1");
        listenerProps.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaconsumer = new KafkaConsumer(listenerProps);
        madmin.createTopic(sname + topicName, 4);
        int numParts = 1;
        int numMsgs = 10;
        ProducerMultiTest.CountCallback cb = new ProducerMultiTest.CountCallback(numMsgs * numParts);
        SendMessagesToProducer worker = new SendMessagesToProducer(kafkaproducer, cb, sname + topicName, numParts, numMsgs);
        ConsumeMessages listenWorker = new ConsumeMessages(kafkaconsumer, sname + topicName, numParts, numMsgs, 20, true);
        Thread producer = new Thread(worker);
        Thread consumer = new Thread(listenWorker);
        consumer.start();
        try {
            Thread.sleep(2000L);
        }
        catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        producer.start();
        producer.join();
        consumer.join();
        Assert.assertTrue((boolean)listenWorker.hasPassed);
        madmin.deleteStream(sname);
    }

    public class ConsumeMessages
    implements Runnable {
        private String streamTopicName;
        private int numPartitions;
        private int numMsgsPerPartition;
        private KafkaConsumer consumer;
        private byte[] key;
        private byte[] value;
        private int numPolls;
        private int expectedNumMsgs;
        public boolean hasPassed;
        private boolean seekToEnd;

        public ConsumeMessages(KafkaConsumer c, String topicName, int numparts, int numMessagesPerPartition, int npolls, boolean seekToEnd) {
            this.streamTopicName = topicName;
            this.numPartitions = numparts;
            this.numMsgsPerPartition = numMessagesPerPartition;
            this.consumer = c;
            this.numPolls = npolls;
            this.expectedNumMsgs = this.numMsgsPerPartition * this.numPartitions;
            this.seekToEnd = seekToEnd;
        }

        @Override
        public void run() {
            ConsumerRecords recs;
            int numConsumedMsgs = 0;
            this.hasPassed = false;
            if (this.seekToEnd) {
                System.out.println("Seeking to end");
                ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
                for (int i = 0; i < this.numPartitions; ++i) {
                    partitions.add(new TopicPartition(this.streamTopicName, i));
                }
                this.consumer.assign(partitions);
                Set subscribed = this.consumer.assignment();
                for (TopicPartition p : subscribed) {
                    this.consumer.seekToEnd(new TopicPartition[]{p});
                    System.out.println("Subscribed to " + p.topic() + " partition:" + p.partition() + " position:" + this.consumer.position(p));
                }
            } else {
                ArrayList<String> topics = new ArrayList<String>();
                topics.add(this.streamTopicName);
                this.consumer.subscribe(topics);
            }
            for (int i = 0; i < this.numPolls && (numConsumedMsgs += (recs = this.consumer.poll(1000L)).count()) != this.expectedNumMsgs; ++i) {
            }
            _logger.info("Msgs recived " + numConsumedMsgs + ", expected " + this.expectedNumMsgs);
            this.consumer.close();
            if (numConsumedMsgs != this.expectedNumMsgs) {
                throw new RuntimeException("Num msgs received " + numConsumedMsgs + ", expected " + this.expectedNumMsgs);
            }
            this.hasPassed = true;
        }
    }
}

