/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.impl.admin.MStreamDescriptor;
import com.mapr.streams.impl.admin.MarlinAdmin;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ListenerRegexTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerRegexTest.class);
    private static final String STREAM = "/jtest-" + ListenerRegexTest.class.getSimpleName();
    private static final String STREAMLG = STREAM + "-LG";
    private static final String STREAMFP = STREAM + "-FP";
    private static final String STREAMFPLG = STREAM + "-FPLG";
    private static final String STREAMOVERLAP = STREAM + "-OVER";
    private static final String STREAMDOT = STREAM + "-DOT";
    private static final String STREAMTWOLISTENER = STREAM + "-TWO";
    private static MarlinAdmin madmin;
    private static final int numParts = 1;

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        madmin = new MarlinAdmin(conf);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMLG);
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMFP);
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMFPLG);
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMOVERLAP);
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMDOT);
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            madmin.deleteStream(STREAMTWOLISTENER);
        }
        catch (Exception e) {
            // empty catch block
        }
        MStreamDescriptor sdesc = new MStreamDescriptor();
        sdesc.setDefaultPartitions(1);
        madmin.createStream(STREAM, sdesc);
        madmin.createStream(STREAMLG, sdesc);
        madmin.createStream(STREAMFP, sdesc);
        madmin.createStream(STREAMFPLG, sdesc);
        madmin.createStream(STREAMOVERLAP, sdesc);
        madmin.createStream(STREAMDOT, sdesc);
        madmin.createStream(STREAMTWOLISTENER, sdesc);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
        madmin.deleteStream(STREAMLG);
        madmin.deleteStream(STREAMFP);
        madmin.deleteStream(STREAMFPLG);
        madmin.deleteStream(STREAMOVERLAP);
        madmin.deleteStream(STREAMDOT);
        madmin.deleteStream(STREAMTWOLISTENER);
    }

    @Test
    public void testTopicOverlapRegexSubscription() throws IOException {
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put(cdef.getMetadataMaxAge(), (Object)3000);
        KafkaConsumer consumer = new KafkaConsumer(props);
        madmin.createTopic(STREAMOVERLAP + ":topic0", 1);
        madmin.createTopic(STREAMOVERLAP + ":topic1", 1);
        madmin.createTopic(STREAMOVERLAP + ":topic2", 1);
        madmin.createTopic(STREAMOVERLAP + ":topic3", 1);
        madmin.createTopic(STREAMOVERLAP + ":topic4", 1);
        madmin.createTopic(STREAMOVERLAP + ":topic1A", 1);
        madmin.createTopic(STREAMOVERLAP + ":topic22", 1);
        HashSet<String> regexMatchSet = new HashSet<String>();
        regexMatchSet.add(STREAMOVERLAP + ":topic0");
        regexMatchSet.add(STREAMOVERLAP + ":topic1");
        regexMatchSet.add(STREAMOVERLAP + ":topic2");
        Pattern regex = Pattern.compile(STREAMOVERLAP + ":topic[0-2]$");
        consumer.subscribe(regex, null);
        Set subscribeList = consumer.assignment();
        Set subscribedSet = consumer.subscription();
        Iterator iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe topic[0-2]$: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)subscribedSet.equals(regexMatchSet));
        consumer.poll(100L);
        regexMatchSet.clear();
        regexMatchSet.add(STREAMOVERLAP + ":topic1");
        regexMatchSet.add(STREAMOVERLAP + ":topic2");
        regexMatchSet.add(STREAMOVERLAP + ":topic3");
        regexMatchSet.add(STREAMOVERLAP + ":topic4");
        regex = Pattern.compile(STREAMOVERLAP + ":topic[1-4]$");
        consumer.subscribe(regex, null);
        subscribeList = consumer.assignment();
        subscribedSet = consumer.subscription();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe topic[1-4]$: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 4 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)subscribedSet.equals(regexMatchSet));
        consumer.poll(100L);
        consumer.unsubscribe();
        subscribeList = consumer.assignment();
        subscribedSet = consumer.subscription();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("unsubscribe: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)subscribedSet.isEmpty());
        consumer.poll(100L);
        regexMatchSet.clear();
        regexMatchSet.add(STREAMOVERLAP + ":topic0");
        regexMatchSet.add(STREAMOVERLAP + ":topic1");
        regexMatchSet.add(STREAMOVERLAP + ":topic2");
        regex = Pattern.compile(STREAMOVERLAP + ":topic[0-2]$");
        consumer.subscribe(regex, null);
        subscribeList = consumer.assignment();
        subscribedSet = consumer.subscription();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe topic[0-2]$: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)subscribedSet.equals(regexMatchSet));
        consumer.poll(100L);
        regexMatchSet.clear();
        regexMatchSet.add(STREAMOVERLAP + ":topic0");
        regexMatchSet.add(STREAMOVERLAP + ":topic1");
        regexMatchSet.add(STREAMOVERLAP + ":topic2");
        regexMatchSet.add(STREAMOVERLAP + ":topic3");
        regexMatchSet.add(STREAMOVERLAP + ":topic4");
        regex = Pattern.compile(STREAMOVERLAP + ":topic[0-4]$");
        consumer.subscribe(regex, null);
        subscribeList = consumer.assignment();
        subscribedSet = consumer.subscription();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe topic[0-4]$: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 5 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)subscribedSet.equals(regexMatchSet));
        consumer.poll(100L);
        consumer.unsubscribe();
        subscribeList = consumer.assignment();
        subscribedSet = consumer.subscription();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("unsubscribe: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)subscribedSet.isEmpty());
        consumer.poll(100L);
        consumer.close();
    }

    @Test
    public void testTopicDotRegexSubscription() throws IOException {
        Pattern regex;
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        KafkaConsumer consumer = new KafkaConsumer(props);
        ArrayList<String> subscrTopics = new ArrayList<String>();
        subscrTopics.add("t-pic");
        subscrTopics.add("t.pic");
        subscrTopics.add("t_pic");
        subscrTopics.add("tApic");
        subscrTopics.add("tbpic");
        subscrTopics.add("t1pic");
        subscrTopics.add("t1picABC");
        subscrTopics.add("xad");
        subscrTopics.add("x123");
        subscrTopics.add("x");
        subscrTopics.add("abcx123");
        subscrTopics.add("-x123");
        for (String topic : subscrTopics) {
            madmin.createTopic(STREAMDOT + ":" + topic, 1);
        }
        ArrayList<String> topics = new ArrayList<String>();
        boolean failNullSubscription = false;
        try {
            topics.add(null);
            consumer.subscribe(topics);
        }
        catch (Exception e) {
            System.out.println("subscribe to null topic string... " + e);
            failNullSubscription = true;
        }
        Assert.assertTrue((boolean)failNullSubscription);
        ArrayList<String> subscribeArrayNull = new ArrayList<String>();
        subscribeArrayNull.add(STREAMDOT + ":t.pic");
        subscribeArrayNull.add(null);
        subscribeArrayNull.add(STREAMDOT + ":testing");
        failNullSubscription = false;
        try {
            consumer.subscribe(subscribeArrayNull);
        }
        catch (Exception e) {
            System.out.println("subscribe to null topic in array failed... " + e);
            failNullSubscription = true;
        }
        Assert.assertTrue((boolean)failNullSubscription);
        subscribeArrayNull = new ArrayList();
        failNullSubscription = false;
        consumer.subscribe(subscribeArrayNull);
        Set subscribeList = consumer.assignment();
        Iterator iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe unset list: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((consumer.subscription().size() == 0 ? 1 : 0) != 0);
        subscribeArrayNull.clear();
        subscribeArrayNull.add(STREAMDOT + ":DOESNOTEXIST");
        subscribeArrayNull.add(STREAMDOT + ":t1pic");
        subscribeArrayNull.add(STREAMDOT + ":x123");
        consumer.subscribe(subscribeArrayNull);
        subscribeList = consumer.assignment();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe valid list: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((consumer.subscription().size() == 3 ? 1 : 0) != 0);
        consumer.unsubscribe();
        subscribeArrayNull.clear();
        subscribeArrayNull.add(STREAMDOT + ":t.pic");
        consumer.subscribe(subscribeArrayNull);
        subscribeList = consumer.assignment();
        subscribeList = consumer.assignment();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe topic t.pic: " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((consumer.subscription().size() == 1 ? 1 : 0) != 0);
        consumer.unsubscribe();
        int expectedMatch = 0;
        ArrayList<String> patterns = new ArrayList<String>();
        patterns.add("t.pic");
        patterns.add("t.pic$");
        patterns.add("t.{1}pic");
        patterns.add("t.{1}pic$");
        patterns.add("x.*$");
        patterns.add("^x.*$");
        patterns.add(".*$");
        patterns.add("^[:alpha:]*$");
        patterns.add("^\\w*$");
        patterns.add(".*::.*");
        patterns.add(".*:.*");
        for (String pattern : patterns) {
            regex = Pattern.compile(STREAMDOT + ":" + pattern);
            Pattern topicRegex = Pattern.compile(pattern);
            expectedMatch = 0;
            for (String topic : subscrTopics) {
                if (!topicRegex.matcher(topic).matches()) continue;
                ++expectedMatch;
            }
            consumer.subscribe(regex, null);
            subscribeList = consumer.assignment();
            iter = subscribeList.iterator();
            while (iter.hasNext()) {
                System.out.println("subscribe regex " + pattern + ": " + iter.next());
            }
            if (subscribeList.size() != expectedMatch) {
                _logger.info("Expected matches for topic pattern " + pattern);
                for (String topic : subscrTopics) {
                    if (!topicRegex.matcher(topic).matches()) continue;
                    System.out.println(pattern + " matched " + topic);
                }
            }
            Assert.assertTrue((subscribeList.size() == expectedMatch ? 1 : 0) != 0);
            consumer.unsubscribe();
            subscribeList = consumer.assignment();
            Assert.assertTrue((subscribeList.size() == 0 ? 1 : 0) != 0);
        }
        regex = Pattern.compile(STREAMDOT + ":*");
        Exception ex = null;
        try {
            consumer.subscribe(regex, null);
        }
        catch (Exception e) {
            ex = e;
        }
        Assert.assertTrue((boolean)(ex instanceof IllegalArgumentException));
        consumer.unsubscribe();
        consumer.close();
    }

    @Test
    public void testTopicRegexSubscription() throws IOException {
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        KafkaConsumer consumer = new KafkaConsumer(props);
        this.runRegexTestWithSingleListener(consumer, STREAM, null);
    }

    @Test
    public void testListenerGroupTopicRegexSubscription() throws IOException {
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "regexgroup");
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        RebalanceCb cb = new RebalanceCb();
        ByteArrayDeserializer keyD = new ByteArrayDeserializer();
        ByteArrayDeserializer valueD = new ByteArrayDeserializer();
        KafkaConsumer consumer = new KafkaConsumer(props, (Deserializer)keyD, (Deserializer)valueD);
        this.runRegexTestWithSingleListener(consumer, STREAMLG, cb);
    }

    @Test
    public void testListenerGroupTwoListenersRegex() throws IOException {
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "regexgroup");
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        RebalanceCb cb1 = new RebalanceCb();
        ByteArrayDeserializer keyD1 = new ByteArrayDeserializer();
        ByteArrayDeserializer valueD1 = new ByteArrayDeserializer();
        KafkaConsumer consumer1 = new KafkaConsumer(props, (Deserializer)keyD1, (Deserializer)valueD1);
        RebalanceCb cb2 = new RebalanceCb();
        ByteArrayDeserializer keyD2 = new ByteArrayDeserializer();
        ByteArrayDeserializer valueD2 = new ByteArrayDeserializer();
        KafkaConsumer consumer2 = new KafkaConsumer(props, (Deserializer)keyD2, (Deserializer)valueD2);
        madmin.createTopic(STREAMTWOLISTENER + ":topic0", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":topic1", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":topic2", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":topic3", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":topic4", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":topic1A", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":topic22", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":abc", 2);
        madmin.createTopic(STREAMTWOLISTENER + ":12345", 2);
        Pattern regex = Pattern.compile(STREAMTWOLISTENER + ":^topic.*$");
        consumer1.subscribe(regex, (ConsumerRebalanceListener)cb1);
        int numTries = 5;
        while (numTries > 0) {
            try {
                Thread.sleep(150L);
            }
            catch (Exception e) {
                System.out.println(e);
            }
            cb1.assignDone();
            if (consumer1.assignment().size() != 14) continue;
        }
        cb1.clear();
        cb2.clear();
        Set subscribeList1 = consumer1.assignment();
        Iterator iter1 = subscribeList1.iterator();
        while (iter1.hasNext()) {
            System.out.println("consumer 1 subscribe regexArr, consumer 1: " + iter1.next());
        }
        Set subscribeList2 = consumer2.assignment();
        Iterator iter2 = subscribeList2.iterator();
        while (iter2.hasNext()) {
            System.out.println("consumer 1 subscribe regexArr, consumer 2: " + iter2.next());
        }
        Assert.assertTrue((subscribeList1.size() == 14 ? 1 : 0) != 0);
        Assert.assertTrue((subscribeList2.size() == 0 ? 1 : 0) != 0);
        consumer2.subscribe(regex, (ConsumerRebalanceListener)cb2);
        numTries = 5;
        while (numTries > 0) {
            try {
                Thread.sleep(150L);
            }
            catch (Exception e) {
                System.out.println(e);
            }
            cb2.assignDone();
            if (consumer1.assignment().size() + consumer2.assignment().size() != 14) continue;
        }
        cb1.clear();
        cb2.clear();
        subscribeList1 = consumer1.assignment();
        iter1 = subscribeList1.iterator();
        while (iter1.hasNext()) {
            System.out.println("consumer 2 subscribe regexArr, consumer 1: " + iter1.next());
        }
        subscribeList2 = consumer2.assignment();
        iter2 = subscribeList2.iterator();
        while (iter2.hasNext()) {
            System.out.println("consumer 2 subscribe regexArr, consumer 2: " + iter2.next());
        }
        Assert.assertTrue((subscribeList1.size() + subscribeList2.size() == 14 ? 1 : 0) != 0);
        consumer1.unsubscribe();
        numTries = 5;
        while (numTries > 0) {
            try {
                Thread.sleep(150L);
            }
            catch (Exception e) {
                System.out.println(e);
            }
            cb1.revokeDone();
            if (consumer1.assignment().size() != 0) continue;
        }
        numTries = 5;
        while (numTries > 0) {
            try {
                Thread.sleep(150L);
            }
            catch (Exception e) {
                System.out.println(e);
            }
            cb2.assignDone();
            if (consumer2.assignment().size() != 14) continue;
        }
        cb1.clear();
        cb2.clear();
        subscribeList1 = consumer1.assignment();
        iter1 = subscribeList1.iterator();
        while (iter1.hasNext()) {
            System.out.println("consumer 1 unsubscribe regexArr, consumer 1: " + iter1.next());
        }
        subscribeList2 = consumer2.assignment();
        iter2 = subscribeList2.iterator();
        while (iter2.hasNext()) {
            System.out.println("consumer 1 unsubscribe regexArr, consumer 2: " + iter2.next());
        }
        Assert.assertTrue((subscribeList1.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((subscribeList2.size() == 14 ? 1 : 0) != 0);
        consumer2.unsubscribe();
        numTries = 5;
        while (numTries > 0) {
            try {
                Thread.sleep(150L);
            }
            catch (Exception e) {
                System.out.println(e);
            }
            cb2.revokeDone();
            if (consumer2.assignment().size() != 0) continue;
        }
        cb1.clear();
        cb2.clear();
        subscribeList1 = consumer1.assignment();
        iter1 = subscribeList1.iterator();
        while (iter1.hasNext()) {
            System.out.println("consumer 2 unsubscribe regexArr, consumer 1: " + iter1.next());
        }
        subscribeList2 = consumer2.assignment();
        iter2 = subscribeList2.iterator();
        while (iter2.hasNext()) {
            System.out.println("consumer 2 unsubscribe regexArr, consumer 2: " + iter2.next());
        }
        Assert.assertTrue((subscribeList1.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((subscribeList2.size() == 0 ? 1 : 0) != 0);
        consumer1.subscribe(regex, (ConsumerRebalanceListener)cb1);
        numTries = 5;
        while (numTries > 0) {
            try {
                Thread.sleep(150L);
            }
            catch (Exception e) {
                System.out.println(e);
            }
            cb1.assignDone();
            if (consumer1.assignment().size() != 14) continue;
        }
        cb1.clear();
        cb2.clear();
        subscribeList1 = consumer1.assignment();
        iter1 = subscribeList1.iterator();
        while (iter1.hasNext()) {
            System.out.println("consumer 1 re-subscribe regexArr, consumer 1: " + iter1.next());
        }
        subscribeList2 = consumer2.assignment();
        iter2 = subscribeList2.iterator();
        while (iter2.hasNext()) {
            System.out.println("consumer 1 re-subscribe regexArr, consumer 2: " + iter2.next());
        }
        Assert.assertTrue((subscribeList1.size() == 14 ? 1 : 0) != 0);
        Assert.assertTrue((subscribeList2.size() == 0 ? 1 : 0) != 0);
        consumer1.close();
        consumer2.close();
    }

    @Test
    public void testListenerGroupSubscribeFullPath() throws IOException {
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "fullpathgroup");
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        RebalanceCb cb = new RebalanceCb();
        ByteArrayDeserializer keyD = new ByteArrayDeserializer();
        ByteArrayDeserializer valueD = new ByteArrayDeserializer();
        KafkaConsumer consumer = new KafkaConsumer(props, (Deserializer)keyD, (Deserializer)valueD);
        this.runFullPathTest(consumer, STREAMFPLG, cb);
    }

    @Test
    public void testListenerSubscribeFullPath() throws IOException {
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        KafkaConsumer consumer = new KafkaConsumer(props);
        this.runFullPathTest(consumer, STREAMFP, null);
    }

    private void runFullPathTest(KafkaConsumer consumer, String streamName, RebalanceCb cb) throws IOException {
        Set subscribeList;
        madmin.createTopic(streamName + ":topic", 1);
        ArrayList<String> topic = new ArrayList<String>();
        topic.add(streamName + ":topic");
        topic.add("/mapr/my.cluster.com" + streamName + ":topic");
        if (cb != null) {
            consumer.subscribe(topic, (ConsumerRebalanceListener)cb);
        } else {
            consumer.subscribe(topic);
        }
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.assignDone();
        }
        Assert.assertTrue(((subscribeList = consumer.assignment()).size() == 1 ? 1 : 0) != 0);
        consumer.close();
    }

    private void runRegexTestWithSingleListener(KafkaConsumer consumer, String streamName, RebalanceCb cb) throws IOException {
        Pattern regex = Pattern.compile(streamName + ":topic.*$");
        consumer.subscribe(regex, (ConsumerRebalanceListener)cb);
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        Set subscribeList = consumer.assignment();
        Iterator iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("SHOULD NOT SEE ANY STREAMS " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 0 ? 1 : 0) != 0);
        madmin.createTopic(streamName + ":topic1", 1);
        madmin.createTopic(streamName + ":topic", 1);
        madmin.createTopic(streamName + ":topi", 1);
        madmin.createTopic(streamName + ":abcd", 1);
        madmin.createTopic(streamName + ":ab", 1);
        madmin.createTopic(streamName + ":007topic", 1);
        madmin.createTopic(streamName + ":1234567", 1);
        madmin.createTopic(streamName + ":234567", 1);
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.assignDone();
        }
        subscribeList = consumer.assignment();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subcribe " + streamName + " :topic.*$ " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 2 ? 1 : 0) != 0);
        regex = Pattern.compile(streamName + ":ab+$");
        consumer.subscribe(regex, (ConsumerRebalanceListener)cb);
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.revokeDone();
        }
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.assignDone();
        }
        subscribeList = consumer.assignment();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe " + streamName + ":ab+$ " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 1 ? 1 : 0) != 0);
        regex = Pattern.compile(streamName + ":[0,1]+.*");
        consumer.subscribe(regex, (ConsumerRebalanceListener)cb);
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.revokeDone();
        }
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.assignDone();
        }
        subscribeList = consumer.assignment();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("subscribe " + streamName + ":[0,1]+ " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 2 ? 1 : 0) != 0);
        madmin.deleteTopic(streamName + ":topic1");
        madmin.deleteTopic(streamName + ":007topic");
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.revokeDone();
        }
        subscribeList = consumer.assignment();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("deleted topic1 and 007topic" + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 1 ? 1 : 0) != 0);
        consumer.unsubscribe();
        try {
            Thread.sleep(150L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        if (cb != null) {
            cb.revokeDone();
        }
        subscribeList = consumer.assignment();
        iter = subscribeList.iterator();
        while (iter.hasNext()) {
            System.out.println("unsubscrbe " + iter.next());
        }
        Assert.assertTrue((subscribeList.size() == 0 ? 1 : 0) != 0);
        consumer.close();
    }

    private static final class RebalanceCb
    implements ConsumerRebalanceListener {
        private boolean revoked = false;
        private boolean assigned = false;

        public synchronized void clear() {
            this.revoked = false;
            this.assigned = false;
        }

        public synchronized void revokeDone() {
            while (!this.revoked) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
            this.revoked = false;
        }

        public synchronized void assignDone() {
            while (!this.assigned) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
            this.assigned = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            RebalanceCb rebalanceCb = this;
            synchronized (rebalanceCb) {
                this.assigned = true;
                this.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            RebalanceCb rebalanceCb = this;
            synchronized (rebalanceCb) {
                this.revoked = true;
                this.notifyAll();
            }
        }
    }
}

