/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.TopicRefreshListListener;
import com.mapr.streams.TopicRefreshRegexListener;
import com.mapr.streams.impl.listener.MarlinListener;
import com.mapr.streams.tests.listener.ListenerRegexTest;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ListenerTopicRefresherTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerRegexTest.class);
    private static final String STREAM = "/jtest-" + ListenerTopicRefresherTest.class.getSimpleName();
    private static Admin madmin;
    private static final int numParts = 1;
    private static final Properties props;
    private static final Marlinserver.MarlinConfigDefaults cdef;

    private static void waitForMillis(long waitMillis) {
        try {
            Thread.sleep(waitMillis);
        }
        catch (Exception e) {
            System.out.println(e);
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        madmin = Streams.newAdmin((Configuration)conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        madmin.createStream(STREAM, sdesc);
        madmin.createTopic(STREAM, "topic0", 1);
        madmin.createTopic(STREAM, "topic1", 1);
        madmin.createTopic(STREAM, "topic2", 1);
        madmin.createTopic(STREAM, "topic3", 1);
        madmin.createTopic(STREAM, "topic4", 1);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("streams.consumer.default.stream", STREAM);
        props.put(cdef.getMetadataMaxAge(), (Object)100);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteTopic(STREAM, "topic0");
        madmin.deleteTopic(STREAM, "topic1");
        madmin.deleteTopic(STREAM, "topic2");
        madmin.deleteTopic(STREAM, "topic3");
        madmin.deleteTopic(STREAM, "topic4");
        madmin.deleteTopic(STREAM, "topic5");
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testTopicRefresherRegex() throws IOException {
        Pattern regex = Pattern.compile(STREAM + ":topic[0-1]$");
        Pattern regex2 = Pattern.compile(STREAM + ":topic[2-5]$");
        GenericHFactory configFactory = new GenericHFactory();
        ConsumerConfig config = (ConsumerConfig)configFactory.getImplementorInstance("org.apache.kafka.clients.consumer.ConsumerConfig", new Object[]{props}, new Class[]{Map.class});
        MarlinListener marlinListener = new MarlinListener(config, null, null);
        final class TestRegexListener
        implements TopicRefreshRegexListener {
            public int numTopics = 0;
            public boolean unsubscribed = false;

            TestRegexListener() {
            }

            public void updatedTopics(Set<String> topics) {
                if (topics.size() == 0) {
                    System.err.println("TestRegexListener -> updatedTopics() - Unsubscribe");
                    this.unsubscribed = true;
                    return;
                }
                this.numTopics = topics.size();
                System.err.println("TestRegexListener -> updatedTopics() - Subscribe");
                for (String t : topics) {
                    System.err.println("Topic = " + t);
                }
            }
        }
        TestRegexListener regexCb = new TestRegexListener();
        marlinListener.topicRefresherRegex(null, (TopicRefreshRegexListener)regexCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertTrue((boolean)regexCb.unsubscribed);
        regexCb.unsubscribed = false;
        marlinListener.topicRefresherRegex(regex, (TopicRefreshRegexListener)regexCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)2L, (long)regexCb.numTopics);
        marlinListener.topicRefresherRegex(regex2, (TopicRefreshRegexListener)regexCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)3L, (long)regexCb.numTopics);
        madmin.createTopic(STREAM, "topic5", 1);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)4L, (long)regexCb.numTopics);
        madmin.deleteTopic(STREAM, "topic4");
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)3L, (long)regexCb.numTopics);
        madmin.createTopic(STREAM, "topic4", 1);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)4L, (long)regexCb.numTopics);
        marlinListener.topicRefresherRegex(null, (TopicRefreshRegexListener)regexCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertTrue((boolean)regexCb.unsubscribed);
        marlinListener.close();
    }

    @Test
    public void testTopicRefresherList() throws IOException {
        GenericHFactory configFactory = new GenericHFactory();
        ConsumerConfig config = (ConsumerConfig)configFactory.getImplementorInstance("org.apache.kafka.clients.consumer.ConsumerConfig", new Object[]{props}, new Class[]{Map.class});
        MarlinListener marlinListener = new MarlinListener(config, null, null);
        final class TestListListener
        implements TopicRefreshListListener {
            public int numTopics = 0;
            public boolean unsubscribed = false;

            TestListListener() {
            }

            public void updatedTopics(Set<TopicPartition> topicFeeds) {
                if (topicFeeds.size() == 0) {
                    System.err.println("TestListListener -> updatedTopics() - Unsubscribe");
                    this.unsubscribed = true;
                    return;
                }
                this.numTopics = topicFeeds.size();
                System.err.println("TestListListener -> updatedTopics() - Subscribe");
                for (TopicPartition tp : topicFeeds) {
                    System.err.println("Topic = " + tp.topic() + ", Num partitions = " + tp.partition());
                }
            }
        }
        TestListListener listCb = new TestListListener();
        marlinListener.topicRefresherList(new ArrayList(0), (TopicRefreshListListener)listCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertTrue((boolean)listCb.unsubscribed);
        listCb.unsubscribed = false;
        marlinListener.topicRefresherList(Arrays.asList(STREAM + ":topic0", STREAM + ":topic1"), (TopicRefreshListListener)listCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)2L, (long)listCb.numTopics);
        marlinListener.topicRefresherList(Arrays.asList(STREAM + ":topic2", STREAM + ":topic3", STREAM + ":topic4"), (TopicRefreshListListener)listCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)3L, (long)listCb.numTopics);
        madmin.deleteTopic(STREAM, "topic4");
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)2L, (long)listCb.numTopics);
        madmin.createTopic(STREAM, "topic4");
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)3L, (long)listCb.numTopics);
        madmin.editTopic(STREAM, "topic3", 3);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)3L, (long)listCb.numTopics);
        madmin.editTopic(STREAM, "topic2", 5);
        madmin.editTopic(STREAM, "topic4", 6);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)3L, (long)listCb.numTopics);
        madmin.deleteTopic(STREAM, "topic2");
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertEquals((long)2L, (long)listCb.numTopics);
        marlinListener.topicRefresherList(new ArrayList(0), (TopicRefreshListListener)listCb);
        ListenerTopicRefresherTest.waitForMillis(150L);
        Assert.assertTrue((boolean)listCb.unsubscribed);
        madmin.createTopic(STREAM, "topic2", 1);
        marlinListener.close();
    }

    static {
        props = new Properties();
        cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
    }
}

