/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ProducerAndListenerDefaultStream
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ProducerAndListenerDefaultStream.class);
    private static final String STREAM = "/jtest-" + ProducerAndListenerDefaultStream.class.getSimpleName();
    private static final String DEFAULTSTREAM = "/jtest-DEFAULTSTREAM";
    private static final String STREAMLISTTOPIC = STREAM + "-listtopic";
    private static final String DEFAULTSTREAMLISTTOPIC = "/jtest-DEFAULTSTREAM-listtopic";
    private static Admin madmin;
    private static final int numParts = 1;

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(DEFAULTSTREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMLISTTOPIC);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(DEFAULTSTREAMLISTTOPIC);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        madmin.createStream(STREAM, sdesc);
        madmin.createStream(DEFAULTSTREAM, sdesc);
        madmin.createStream(STREAMLISTTOPIC, sdesc);
        madmin.createStream(DEFAULTSTREAMLISTTOPIC, sdesc);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
        madmin.deleteStream(DEFAULTSTREAM);
        madmin.deleteStream(STREAMLISTTOPIC);
        madmin.deleteStream(DEFAULTSTREAMLISTTOPIC);
    }

    public void testProducerAndConsumer(int numMsgs, String topicname, KafkaProducer producer, KafkaConsumer consumer) throws Exception {
        RecordMetadata rm;
        Future[] futuresDefault = new Future[numMsgs];
        Future[] futuresNonDefault = new Future[numMsgs];
        for (int i = 0; i < numMsgs; ++i) {
            String key = "key-value" + i;
            String msg = "msg-value" + i;
            ProducerRecord record = new ProducerRecord(STREAM + ":" + topicname, (Object)key.getBytes(), (Object)msg.getBytes());
            futuresNonDefault[i] = producer.send(record);
            record = new ProducerRecord(topicname, (Object)key.getBytes(), (Object)msg.getBytes());
            futuresDefault[i] = producer.send(record);
        }
        producer.flush();
        producer.close();
        long offset = -1L;
        for (Future future : futuresDefault) {
            rm = (RecordMetadata)future.get();
            Assert.assertTrue((rm.partition() == 0 ? 1 : 0) != 0);
            Assert.assertTrue((offset < rm.offset() ? 1 : 0) != 0);
            offset = rm.offset();
            Assert.assertTrue((boolean)rm.topic().equals("/jtest-DEFAULTSTREAM:" + topicname));
        }
        offset = -1L;
        for (Future future : futuresNonDefault) {
            rm = (RecordMetadata)future.get();
            Assert.assertTrue((rm.partition() == 0 ? 1 : 0) != 0);
            Assert.assertTrue((offset < rm.offset() ? 1 : 0) != 0);
            offset = rm.offset();
            Assert.assertTrue((boolean)rm.topic().equals(STREAM + ":" + topicname));
        }
        if (consumer == null) {
            return;
        }
        ArrayList<String> topics = new ArrayList<String>(2);
        topics.add(topicname);
        topics.add(STREAM + ":" + topicname);
        RebalanceCb cb = new RebalanceCb();
        consumer.subscribe(topics, (ConsumerRebalanceListener)cb);
        cb.assignDone();
        boolean done = false;
        int countDefault = 0;
        int countNonDefault = 0;
        while (!done) {
            ConsumerRecords recs = consumer.poll(1000L);
            for (ConsumerRecord oneRecord : recs.records(new TopicPartition("/jtest-DEFAULTSTREAM:" + topicname, 0))) {
                Assert.assertTrue((oneRecord.partition() == 0 ? 1 : 0) != 0);
                Assert.assertTrue((boolean)oneRecord.topic().equals("/jtest-DEFAULTSTREAM:" + topicname));
                ++countDefault;
            }
            for (ConsumerRecord oneRecord : recs.records(new TopicPartition(STREAM + ":" + topicname, 0))) {
                Assert.assertTrue((oneRecord.partition() == 0 ? 1 : 0) != 0);
                Assert.assertTrue((boolean)oneRecord.topic().equals(STREAM + ":" + topicname));
                ++countNonDefault;
            }
            if (countDefault != numMsgs || countNonDefault != numMsgs) continue;
            done = true;
        }
        consumer.close();
    }

    @Test
    public void testDefaultStreamNameForProducer() throws Exception {
        int numMsgs = 10;
        String topicname = "producertest";
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), "false");
        props.put("streams.producer.default.stream", DEFAULTSTREAM);
        KafkaProducer producer = new KafkaProducer(props);
        this.testProducerAndConsumer(numMsgs, topicname, producer, null);
    }

    @Test
    public void testDefaultStreamNameForConsumer() throws Exception {
        int numMsgs = 10;
        String topicname = "consumertest";
        madmin.createTopic(DEFAULTSTREAM, topicname);
        madmin.createTopic(STREAM, topicname);
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("streams.consumer.default.stream", DEFAULTSTREAM);
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> topics = new ArrayList<String>(2);
        topics.add(topicname);
        topics.add(STREAM + ":" + topicname);
        RebalanceCb cb = new RebalanceCb();
        consumer.subscribe(topics, (ConsumerRebalanceListener)cb);
        cb.assignDone();
        Set subscription = consumer.subscription();
        Set assignment = consumer.assignment();
        HashSet<String> correctSubscription = new HashSet<String>();
        correctSubscription.add("/jtest-DEFAULTSTREAM:" + topicname);
        correctSubscription.add(STREAM + ":" + topicname);
        HashSet<TopicPartition> correctAssignment = new HashSet<TopicPartition>();
        correctAssignment.add(new TopicPartition("/jtest-DEFAULTSTREAM:" + topicname, 0));
        correctAssignment.add(new TopicPartition(STREAM + ":" + topicname, 0));
        Assert.assertTrue((subscription.size() == assignment.size() ? 1 : 0) != 0);
        Assert.assertTrue((boolean)correctSubscription.equals(subscription));
        Assert.assertTrue((boolean)correctAssignment.equals(assignment));
        Assert.assertTrue((consumer.position(new TopicPartition(topicname, 0)) == 0L ? 1 : 0) != 0);
        HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        toCommit.put(new TopicPartition(topicname, 0), new OffsetAndMetadata(100L));
        toCommit.put(new TopicPartition(STREAM + ":" + topicname, 0), new OffsetAndMetadata(200L));
        consumer.commitSync(toCommit);
        Assert.assertTrue((consumer.committed(new TopicPartition(topicname, 0)).offset() == 100L ? 1 : 0) != 0);
        Assert.assertTrue((consumer.committed(new TopicPartition("/jtest-DEFAULTSTREAM:" + topicname, 0)).offset() == 100L ? 1 : 0) != 0);
        Assert.assertTrue((consumer.committed(new TopicPartition(STREAM + ":" + topicname, 0)).offset() == 200L ? 1 : 0) != 0);
        consumer.unsubscribe();
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>(2);
        partitions.add(new TopicPartition(topicname, 0));
        partitions.add(new TopicPartition(STREAM + ":" + topicname, 0));
        consumer.assign(partitions);
        subscription = consumer.subscription();
        assignment = consumer.assignment();
        Assert.assertTrue((boolean)subscription.isEmpty());
        Assert.assertTrue((assignment.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)correctAssignment.equals(assignment));
        consumer.close();
    }

    @Test
    public void testDefaultStreamNameForProducerAndConsumer() throws Exception {
        int numMsgs = 10;
        String topicname = "listenerPollTest";
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), "false");
        props.put("streams.producer.default.stream", DEFAULTSTREAM);
        KafkaProducer producer = new KafkaProducer(props);
        props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("streams.consumer.default.stream", DEFAULTSTREAM);
        KafkaConsumer consumer = new KafkaConsumer(props);
        this.testProducerAndConsumer(numMsgs, topicname, producer, consumer);
    }

    @Test
    public void testListTopicsForConsumer() throws Exception {
        int number;
        Object key;
        String topicname = "list";
        int numTopics = 31;
        for (int i = 0; i < numTopics; ++i) {
            madmin.createTopic(DEFAULTSTREAMLISTTOPIC, topicname + i, i + 1);
            madmin.createTopic(STREAMLISTTOPIC, topicname + i, i + 1);
        }
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        KafkaConsumer consumer = new KafkaConsumer(props);
        Map listTopicWithoutString = null;
        Map listTopicWithString = null;
        listTopicWithoutString = consumer.listTopics();
        Assert.assertTrue((listTopicWithoutString.size() == 0 ? 1 : 0) != 0);
        listTopicWithString = consumer.listTopics(STREAMLISTTOPIC);
        Assert.assertTrue((listTopicWithString.size() == numTopics ? 1 : 0) != 0);
        String prefix = STREAMLISTTOPIC + ":" + topicname;
        HashSet<Integer> seenTopics = new HashSet<Integer>();
        for (Map.Entry entry : listTopicWithString.entrySet()) {
            key = (String)entry.getKey();
            Assert.assertTrue((boolean)((String)key).startsWith(prefix));
            key = ((String)key).substring(prefix.length());
            int n = Integer.parseInt((String)key);
            seenTopics.add(n);
            Assert.assertTrue((((List)entry.getValue()).size() == n + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == numTopics ? 1 : 0) != 0);
        listTopicWithString = consumer.listTopics(DEFAULTSTREAMLISTTOPIC);
        Assert.assertTrue((listTopicWithString.size() == numTopics ? 1 : 0) != 0);
        prefix = "/jtest-DEFAULTSTREAM-listtopic:" + topicname;
        seenTopics = new HashSet();
        for (Map.Entry entry : listTopicWithString.entrySet()) {
            key = (String)entry.getKey();
            Assert.assertTrue((boolean)((String)key).startsWith(prefix));
            key = ((String)key).substring(prefix.length());
            int n = Integer.parseInt((String)key);
            seenTopics.add(n);
            Assert.assertTrue((((List)entry.getValue()).size() == n + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == numTopics ? 1 : 0) != 0);
        String topicPattern = "listA.*";
        Pattern pattern = Pattern.compile(topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithoutString.size() == 0 ? 1 : 0) != 0);
        pattern = Pattern.compile(STREAMLISTTOPIC + ":" + topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithoutString.size() == 0 ? 1 : 0) != 0);
        topicPattern = "list.$";
        pattern = Pattern.compile(topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithString.size() == 0 ? 1 : 0) != 0);
        pattern = Pattern.compile(STREAMLISTTOPIC + ":" + topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithString.size() == 10 ? 1 : 0) != 0);
        prefix = STREAMLISTTOPIC + ":" + topicname;
        seenTopics = new HashSet();
        for (Map.Entry entry : listTopicWithString.entrySet()) {
            String key2 = (String)entry.getKey();
            Assert.assertTrue((boolean)key2.startsWith(prefix));
            key2 = key2.substring(prefix.length());
            int number3 = Integer.parseInt(key2);
            seenTopics.add(number3);
            Assert.assertTrue((((List)entry.getValue()).size() == number3 + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == 10 ? 1 : 0) != 0);
        consumer.close();
        props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("streams.consumer.default.stream", DEFAULTSTREAMLISTTOPIC);
        consumer = new KafkaConsumer(props);
        listTopicWithoutString = consumer.listTopics();
        Assert.assertTrue((listTopicWithoutString.size() == numTopics ? 1 : 0) != 0);
        String nullString = null;
        listTopicWithoutString = consumer.listTopics(nullString);
        Assert.assertTrue((listTopicWithoutString.size() == numTopics ? 1 : 0) != 0);
        seenTopics = new HashSet();
        prefix = "/jtest-DEFAULTSTREAM-listtopic:" + topicname;
        for (Map.Entry entry : listTopicWithoutString.entrySet()) {
            String key3 = (String)entry.getKey();
            Assert.assertTrue((boolean)key3.startsWith(prefix));
            key3 = key3.substring(prefix.length());
            number = Integer.parseInt(key3);
            seenTopics.add(number);
            Assert.assertTrue((((List)entry.getValue()).size() == number + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == numTopics ? 1 : 0) != 0);
        listTopicWithString = consumer.listTopics(DEFAULTSTREAMLISTTOPIC);
        Assert.assertTrue((listTopicWithString.size() == numTopics ? 1 : 0) != 0);
        prefix = "/jtest-DEFAULTSTREAM-listtopic:" + topicname;
        seenTopics = new HashSet();
        for (Map.Entry entry : listTopicWithString.entrySet()) {
            String key4 = (String)entry.getKey();
            Assert.assertTrue((boolean)key4.startsWith(prefix));
            key4 = key4.substring(prefix.length());
            number = Integer.parseInt(key4);
            seenTopics.add(number);
            Assert.assertTrue((((List)entry.getValue()).size() == number + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == numTopics ? 1 : 0) != 0);
        listTopicWithString = consumer.listTopics(STREAMLISTTOPIC);
        Assert.assertTrue((listTopicWithString.size() == numTopics ? 1 : 0) != 0);
        prefix = STREAMLISTTOPIC + ":" + topicname;
        seenTopics = new HashSet();
        for (Map.Entry entry : listTopicWithString.entrySet()) {
            String key5 = (String)entry.getKey();
            Assert.assertTrue((boolean)key5.startsWith(prefix));
            key5 = key5.substring(prefix.length());
            number = Integer.parseInt(key5);
            seenTopics.add(number);
            Assert.assertTrue((((List)entry.getValue()).size() == number + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == numTopics ? 1 : 0) != 0);
        topicPattern = "listA.*";
        pattern = Pattern.compile(topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithString.size() == 0 ? 1 : 0) != 0);
        pattern = Pattern.compile(STREAMLISTTOPIC + ":" + topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithString.size() == 0 ? 1 : 0) != 0);
        topicPattern = "list.$";
        pattern = Pattern.compile(topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithString.size() == 10 ? 1 : 0) != 0);
        prefix = "/jtest-DEFAULTSTREAM-listtopic:" + topicname;
        seenTopics = new HashSet();
        for (Map.Entry entry : listTopicWithString.entrySet()) {
            String key6 = (String)entry.getKey();
            Assert.assertTrue((boolean)key6.startsWith(prefix));
            key6 = key6.substring(prefix.length());
            number = Integer.parseInt(key6);
            seenTopics.add(number);
            Assert.assertTrue((((List)entry.getValue()).size() == number + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == 10 ? 1 : 0) != 0);
        pattern = Pattern.compile(STREAMLISTTOPIC + ":" + topicPattern);
        listTopicWithString = consumer.listTopics(pattern);
        Assert.assertTrue((listTopicWithString.size() == 10 ? 1 : 0) != 0);
        prefix = STREAMLISTTOPIC + ":" + topicname;
        seenTopics = new HashSet();
        for (Map.Entry entry : listTopicWithString.entrySet()) {
            String key7 = (String)entry.getKey();
            Assert.assertTrue((boolean)key7.startsWith(prefix));
            key7 = key7.substring(prefix.length());
            number = Integer.parseInt(key7);
            seenTopics.add(number);
            Assert.assertTrue((((List)entry.getValue()).size() == number + 1 ? 1 : 0) != 0);
        }
        Assert.assertTrue((seenTopics.size() == 10 ? 1 : 0) != 0);
        consumer.close();
    }

    public final class RebalanceCb
    implements ConsumerRebalanceListener {
        private boolean revoked = false;
        private boolean assigned = false;

        public synchronized void clear() {
            this.revoked = false;
            this.assigned = false;
        }

        public synchronized void revokeDone() {
            while (!this.revoked) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
            this.revoked = false;
        }

        public synchronized void assignDone() {
            while (!this.assigned) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
            this.assigned = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            RebalanceCb rebalanceCb = this;
            synchronized (rebalanceCb) {
                this.assigned = true;
                this.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            RebalanceCb rebalanceCb = this;
            synchronized (rebalanceCb) {
                this.revoked = true;
                this.notifyAll();
            }
        }
    }
}

