package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/ListenerRegexTest.class */
public class ListenerRegexTest extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerRegexTest.class);
    private static final String STREAM = "/jtest-" + ListenerRegexTest.class.getSimpleName();
    private static final String STREAMLG = STREAM + "-LG";
    private static final String STREAMFP = STREAM + "-FP";
    private static final String STREAMFPLG = STREAM + "-FPLG";
    private static final String STREAMOVERLAP = STREAM + "-OVER";
    private static final String STREAMDOT = STREAM + "-DOT";
    private static final String STREAMTWOLISTENER = STREAM + "-TWO";
    private static Admin madmin;
    private static final int numParts = 1;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/tests/listener/ListenerRegexTest$RebalanceCb.class */
    public static final class RebalanceCb implements ConsumerRebalanceListener {
        private boolean revoked = false;
        private boolean assigned = false;

        public synchronized void clear() {
            this.revoked = false;
            this.assigned = false;
        }

        public synchronized void revokeDone() {
            while (!this.revoked) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
            this.revoked = false;
        }

        public synchronized void assignDone() {
            while (!this.assigned) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
            this.assigned = false;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            synchronized (this) {
                this.assigned = true;
                notifyAll();
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            synchronized (this) {
                this.revoked = true;
                notifyAll();
            }
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
        try {
            madmin.deleteStream(STREAMLG);
        } catch (Exception e2) {
        }
        try {
            madmin.deleteStream(STREAMFP);
        } catch (Exception e3) {
        }
        try {
            madmin.deleteStream(STREAMFPLG);
        } catch (Exception e4) {
        }
        try {
            madmin.deleteStream(STREAMOVERLAP);
        } catch (Exception e5) {
        }
        try {
            madmin.deleteStream(STREAMDOT);
        } catch (Exception e6) {
        }
        try {
            madmin.deleteStream(STREAMTWOLISTENER);
        } catch (Exception e7) {
        }
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        madmin.createStream(STREAM, newStreamDescriptor);
        madmin.createStream(STREAMLG, newStreamDescriptor);
        madmin.createStream(STREAMFP, newStreamDescriptor);
        madmin.createStream(STREAMFPLG, newStreamDescriptor);
        madmin.createStream(STREAMOVERLAP, newStreamDescriptor);
        madmin.createStream(STREAMDOT, newStreamDescriptor);
        madmin.createStream(STREAMTWOLISTENER, newStreamDescriptor);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
        madmin.deleteStream(STREAMLG);
        madmin.deleteStream(STREAMFP);
        madmin.deleteStream(STREAMFPLG);
        madmin.deleteStream(STREAMOVERLAP);
        madmin.deleteStream(STREAMDOT);
        madmin.deleteStream(STREAMTWOLISTENER);
    }

    @Test
    public void testTopicOverlapRegexSubscription() throws IOException {
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put(defaultInstance.getMetadataMaxAge(), 3000);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        madmin.createTopic(STREAMOVERLAP, "topic0", numParts);
        madmin.createTopic(STREAMOVERLAP, "topic1", numParts);
        madmin.createTopic(STREAMOVERLAP, "topic2", numParts);
        madmin.createTopic(STREAMOVERLAP, "topic3", numParts);
        madmin.createTopic(STREAMOVERLAP, "topic4", numParts);
        madmin.createTopic(STREAMOVERLAP, "topic1A", numParts);
        madmin.createTopic(STREAMOVERLAP, "topic22", numParts);
        HashSet hashSet = new HashSet();
        hashSet.add(STREAMOVERLAP + ":topic0");
        hashSet.add(STREAMOVERLAP + ":topic1");
        hashSet.add(STREAMOVERLAP + ":topic2");
        kafkaConsumer.subscribe(Pattern.compile(STREAMOVERLAP + ":topic[0-2]$"), (ConsumerRebalanceListener) null);
        Set assignment = kafkaConsumer.assignment();
        Set subscription = kafkaConsumer.subscription();
        Iterator it = assignment.iterator();
        while (it.hasNext()) {
            System.out.println("subscribe topic[0-2]$: " + it.next());
        }
        Assert.assertTrue(assignment.size() == 3);
        Assert.assertTrue(subscription.equals(hashSet));
        kafkaConsumer.poll(100L);
        hashSet.clear();
        hashSet.add(STREAMOVERLAP + ":topic1");
        hashSet.add(STREAMOVERLAP + ":topic2");
        hashSet.add(STREAMOVERLAP + ":topic3");
        hashSet.add(STREAMOVERLAP + ":topic4");
        kafkaConsumer.subscribe(Pattern.compile(STREAMOVERLAP + ":topic[1-4]$"), (ConsumerRebalanceListener) null);
        Set assignment2 = kafkaConsumer.assignment();
        Set subscription2 = kafkaConsumer.subscription();
        Iterator it2 = assignment2.iterator();
        while (it2.hasNext()) {
            System.out.println("subscribe topic[1-4]$: " + it2.next());
        }
        Assert.assertTrue(assignment2.size() == 4);
        Assert.assertTrue(subscription2.equals(hashSet));
        kafkaConsumer.poll(100L);
        kafkaConsumer.unsubscribe();
        Set assignment3 = kafkaConsumer.assignment();
        Set subscription3 = kafkaConsumer.subscription();
        Iterator it3 = assignment3.iterator();
        while (it3.hasNext()) {
            System.out.println("unsubscribe: " + it3.next());
        }
        Assert.assertTrue(assignment3.size() == 0);
        Assert.assertTrue(subscription3.isEmpty());
        kafkaConsumer.poll(100L);
        hashSet.clear();
        hashSet.add(STREAMOVERLAP + ":topic0");
        hashSet.add(STREAMOVERLAP + ":topic1");
        hashSet.add(STREAMOVERLAP + ":topic2");
        kafkaConsumer.subscribe(Pattern.compile(STREAMOVERLAP + ":topic[0-2]$"), (ConsumerRebalanceListener) null);
        Set assignment4 = kafkaConsumer.assignment();
        Set subscription4 = kafkaConsumer.subscription();
        Iterator it4 = assignment4.iterator();
        while (it4.hasNext()) {
            System.out.println("subscribe topic[0-2]$: " + it4.next());
        }
        Assert.assertTrue(assignment4.size() == 3);
        Assert.assertTrue(subscription4.equals(hashSet));
        kafkaConsumer.poll(100L);
        hashSet.clear();
        hashSet.add(STREAMOVERLAP + ":topic0");
        hashSet.add(STREAMOVERLAP + ":topic1");
        hashSet.add(STREAMOVERLAP + ":topic2");
        hashSet.add(STREAMOVERLAP + ":topic3");
        hashSet.add(STREAMOVERLAP + ":topic4");
        kafkaConsumer.subscribe(Pattern.compile(STREAMOVERLAP + ":topic[0-4]$"), (ConsumerRebalanceListener) null);
        Set assignment5 = kafkaConsumer.assignment();
        Set subscription5 = kafkaConsumer.subscription();
        Iterator it5 = assignment5.iterator();
        while (it5.hasNext()) {
            System.out.println("subscribe topic[0-4]$: " + it5.next());
        }
        Assert.assertTrue(assignment5.size() == 5);
        Assert.assertTrue(subscription5.equals(hashSet));
        kafkaConsumer.poll(100L);
        kafkaConsumer.unsubscribe();
        Set assignment6 = kafkaConsumer.assignment();
        Set subscription6 = kafkaConsumer.subscription();
        Iterator it6 = assignment6.iterator();
        while (it6.hasNext()) {
            System.out.println("unsubscribe: " + it6.next());
        }
        Assert.assertTrue(assignment6.size() == 0);
        Assert.assertTrue(subscription6.isEmpty());
        kafkaConsumer.poll(100L);
        kafkaConsumer.close();
    }

    @Test
    public void testTopicDotRegexSubscription() throws IOException {
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        ArrayList<String> arrayList = new ArrayList();
        arrayList.add("t-pic");
        arrayList.add("t.pic");
        arrayList.add("t_pic");
        arrayList.add("tApic");
        arrayList.add("tbpic");
        arrayList.add("t1pic");
        arrayList.add("t1picABC");
        arrayList.add("xad");
        arrayList.add("x123");
        arrayList.add("x");
        arrayList.add("abcx123");
        arrayList.add("-x123");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            madmin.createTopic(STREAMDOT, (String) it.next(), numParts);
        }
        ArrayList arrayList2 = new ArrayList();
        boolean z = false;
        try {
            arrayList2.add((String) null);
            kafkaConsumer.subscribe(arrayList2);
        } catch (Exception e) {
            System.out.println("subscribe to null topic string... " + e);
            z = numParts;
        }
        Assert.assertTrue(z);
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(STREAMDOT + ":t.pic");
        arrayList3.add(null);
        arrayList3.add(STREAMDOT + ":testing");
        boolean z2 = false;
        try {
            kafkaConsumer.subscribe(arrayList3);
        } catch (Exception e2) {
            System.out.println("subscribe to null topic in array failed... " + e2);
            z2 = numParts;
        }
        Assert.assertTrue(z2);
        ArrayList arrayList4 = new ArrayList();
        kafkaConsumer.subscribe(arrayList4);
        Set assignment = kafkaConsumer.assignment();
        Iterator it2 = assignment.iterator();
        while (it2.hasNext()) {
            System.out.println("subscribe unset list: " + it2.next());
        }
        Assert.assertTrue(assignment.size() == 0);
        Assert.assertTrue(kafkaConsumer.subscription().size() == 0);
        arrayList4.clear();
        arrayList4.add(STREAMDOT + ":DOESNOTEXIST");
        arrayList4.add(STREAMDOT + ":t1pic");
        arrayList4.add(STREAMDOT + ":x123");
        kafkaConsumer.subscribe(arrayList4);
        Set assignment2 = kafkaConsumer.assignment();
        Iterator it3 = assignment2.iterator();
        while (it3.hasNext()) {
            System.out.println("subscribe valid list: " + it3.next());
        }
        Assert.assertTrue(assignment2.size() == 2);
        Assert.assertTrue(kafkaConsumer.subscription().size() == 3);
        kafkaConsumer.unsubscribe();
        arrayList4.clear();
        arrayList4.add(STREAMDOT + ":t.pic");
        kafkaConsumer.subscribe(arrayList4);
        kafkaConsumer.assignment();
        Set assignment3 = kafkaConsumer.assignment();
        Iterator it4 = assignment3.iterator();
        while (it4.hasNext()) {
            System.out.println("subscribe topic t.pic: " + it4.next());
        }
        Assert.assertTrue(assignment3.size() == numParts);
        Assert.assertTrue(kafkaConsumer.subscription().size() == numParts);
        kafkaConsumer.unsubscribe();
        ArrayList<String> arrayList5 = new ArrayList();
        arrayList5.add("t.pic");
        arrayList5.add("t.pic$");
        arrayList5.add("t.{1}pic");
        arrayList5.add("t.{1}pic$");
        arrayList5.add("x.*$");
        arrayList5.add("^x.*$");
        arrayList5.add(".*$");
        arrayList5.add("^[:alpha:]*$");
        arrayList5.add("^\\w*$");
        arrayList5.add(".*::.*");
        arrayList5.add(".*:.*");
        for (String str : arrayList5) {
            Pattern compile = Pattern.compile(STREAMDOT + ":" + str);
            Pattern compile2 = Pattern.compile(str);
            int i = 0;
            Iterator it5 = arrayList.iterator();
            while (it5.hasNext()) {
                if (compile2.matcher((String) it5.next()).matches()) {
                    i += numParts;
                }
            }
            kafkaConsumer.subscribe(compile, (ConsumerRebalanceListener) null);
            Set assignment4 = kafkaConsumer.assignment();
            Iterator it6 = assignment4.iterator();
            while (it6.hasNext()) {
                System.out.println("subscribe regex " + str + ": " + it6.next());
            }
            if (assignment4.size() != i) {
                _logger.info("Expected matches for topic pattern " + str);
                for (String str2 : arrayList) {
                    if (compile2.matcher(str2).matches()) {
                        System.out.println(str + " matched " + str2);
                    }
                }
            }
            Assert.assertTrue(assignment4.size() == i);
            kafkaConsumer.unsubscribe();
            Assert.assertTrue(kafkaConsumer.assignment().size() == 0);
        }
        Exception exc = null;
        try {
            kafkaConsumer.subscribe(Pattern.compile(STREAMDOT + ":*"), (ConsumerRebalanceListener) null);
        } catch (Exception e3) {
            exc = e3;
        }
        Assert.assertTrue(exc instanceof IllegalArgumentException);
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
    }

    @Test
    public void testTopicRegexSubscription() throws IOException {
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        runRegexTestWithSingleListener(new KafkaConsumer(properties), STREAM, null);
    }

    @Test
    public void testListenerGroupTopicRegexSubscription() throws IOException {
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", "regexgroup");
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        runRegexTestWithSingleListener(new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer()), STREAMLG, new RebalanceCb());
    }

    @Test
    public void testListenerGroupTwoListenersRegex() throws IOException {
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", "regexgroup");
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        RebalanceCb rebalanceCb = new RebalanceCb();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        RebalanceCb rebalanceCb2 = new RebalanceCb();
        KafkaConsumer kafkaConsumer2 = new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        madmin.createTopic(STREAMTWOLISTENER, "topic0", 2);
        madmin.createTopic(STREAMTWOLISTENER, "topic1", 2);
        madmin.createTopic(STREAMTWOLISTENER, "topic2", 2);
        madmin.createTopic(STREAMTWOLISTENER, "topic3", 2);
        madmin.createTopic(STREAMTWOLISTENER, "topic4", 2);
        madmin.createTopic(STREAMTWOLISTENER, "topic1A", 2);
        madmin.createTopic(STREAMTWOLISTENER, "topic22", 2);
        madmin.createTopic(STREAMTWOLISTENER, "abc", 2);
        madmin.createTopic(STREAMTWOLISTENER, "12345", 2);
        Pattern compile = Pattern.compile(STREAMTWOLISTENER + ":^topic.*$");
        kafkaConsumer.subscribe(compile, rebalanceCb);
        while (5 > 0) {
            try {
                Thread.sleep(150L);
            } catch (Exception e) {
                System.out.println(e);
            }
            rebalanceCb.assignDone();
            if (kafkaConsumer.assignment().size() == 14) {
                break;
            }
        }
        rebalanceCb.clear();
        rebalanceCb2.clear();
        Set assignment = kafkaConsumer.assignment();
        Iterator it = assignment.iterator();
        while (it.hasNext()) {
            System.out.println("consumer 1 subscribe regexArr, consumer 1: " + it.next());
        }
        Set assignment2 = kafkaConsumer2.assignment();
        Iterator it2 = assignment2.iterator();
        while (it2.hasNext()) {
            System.out.println("consumer 1 subscribe regexArr, consumer 2: " + it2.next());
        }
        Assert.assertTrue(assignment.size() == 14);
        Assert.assertTrue(assignment2.size() == 0);
        kafkaConsumer2.subscribe(compile, rebalanceCb2);
        while (5 > 0) {
            try {
                Thread.sleep(150L);
            } catch (Exception e2) {
                System.out.println(e2);
            }
            rebalanceCb2.assignDone();
            if (kafkaConsumer.assignment().size() + kafkaConsumer2.assignment().size() == 14) {
                break;
            }
        }
        rebalanceCb.clear();
        rebalanceCb2.clear();
        Set assignment3 = kafkaConsumer.assignment();
        Iterator it3 = assignment3.iterator();
        while (it3.hasNext()) {
            System.out.println("consumer 2 subscribe regexArr, consumer 1: " + it3.next());
        }
        Set assignment4 = kafkaConsumer2.assignment();
        Iterator it4 = assignment4.iterator();
        while (it4.hasNext()) {
            System.out.println("consumer 2 subscribe regexArr, consumer 2: " + it4.next());
        }
        Assert.assertTrue(assignment3.size() + assignment4.size() == 14);
        kafkaConsumer.unsubscribe();
        while (5 > 0) {
            try {
                Thread.sleep(150L);
            } catch (Exception e3) {
                System.out.println(e3);
            }
            rebalanceCb.revokeDone();
            if (kafkaConsumer.assignment().size() == 0) {
                break;
            }
        }
        while (5 > 0) {
            try {
                Thread.sleep(150L);
            } catch (Exception e4) {
                System.out.println(e4);
            }
            rebalanceCb2.assignDone();
            if (kafkaConsumer2.assignment().size() == 14) {
                break;
            }
        }
        rebalanceCb.clear();
        rebalanceCb2.clear();
        Set assignment5 = kafkaConsumer.assignment();
        Iterator it5 = assignment5.iterator();
        while (it5.hasNext()) {
            System.out.println("consumer 1 unsubscribe regexArr, consumer 1: " + it5.next());
        }
        Set assignment6 = kafkaConsumer2.assignment();
        Iterator it6 = assignment6.iterator();
        while (it6.hasNext()) {
            System.out.println("consumer 1 unsubscribe regexArr, consumer 2: " + it6.next());
        }
        Assert.assertTrue(assignment5.size() == 0);
        Assert.assertTrue(assignment6.size() == 14);
        kafkaConsumer2.unsubscribe();
        while (5 > 0) {
            try {
                Thread.sleep(150L);
            } catch (Exception e5) {
                System.out.println(e5);
            }
            rebalanceCb2.revokeDone();
            if (kafkaConsumer2.assignment().size() == 0) {
                break;
            }
        }
        rebalanceCb.clear();
        rebalanceCb2.clear();
        Set assignment7 = kafkaConsumer.assignment();
        Iterator it7 = assignment7.iterator();
        while (it7.hasNext()) {
            System.out.println("consumer 2 unsubscribe regexArr, consumer 1: " + it7.next());
        }
        Set assignment8 = kafkaConsumer2.assignment();
        Iterator it8 = assignment8.iterator();
        while (it8.hasNext()) {
            System.out.println("consumer 2 unsubscribe regexArr, consumer 2: " + it8.next());
        }
        Assert.assertTrue(assignment7.size() == 0);
        Assert.assertTrue(assignment8.size() == 0);
        kafkaConsumer.subscribe(compile, rebalanceCb);
        while (5 > 0) {
            try {
                Thread.sleep(150L);
            } catch (Exception e6) {
                System.out.println(e6);
            }
            rebalanceCb.assignDone();
            if (kafkaConsumer.assignment().size() == 14) {
                break;
            }
        }
        rebalanceCb.clear();
        rebalanceCb2.clear();
        Set assignment9 = kafkaConsumer.assignment();
        Iterator it9 = assignment9.iterator();
        while (it9.hasNext()) {
            System.out.println("consumer 1 re-subscribe regexArr, consumer 1: " + it9.next());
        }
        Set assignment10 = kafkaConsumer2.assignment();
        Iterator it10 = assignment10.iterator();
        while (it10.hasNext()) {
            System.out.println("consumer 1 re-subscribe regexArr, consumer 2: " + it10.next());
        }
        Assert.assertTrue(assignment9.size() == 14);
        Assert.assertTrue(assignment10.size() == 0);
        kafkaConsumer.close();
        kafkaConsumer2.close();
    }

    @Test
    public void testListenerGroupSubscribeFullPath() throws IOException {
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", "fullpathgroup");
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        runFullPathTest(new KafkaConsumer(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer()), STREAMFPLG, new RebalanceCb());
    }

    @Test
    public void testListenerSubscribeFullPath() throws IOException {
        Properties properties = new Properties();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        runFullPathTest(new KafkaConsumer(properties), STREAMFP, null);
    }

    private void runFullPathTest(KafkaConsumer kafkaConsumer, String str, RebalanceCb rebalanceCb) throws IOException {
        madmin.createTopic(str, "topic", numParts);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str + ":topic");
        arrayList.add("/mapr/my.cluster.com" + str + ":topic");
        if (rebalanceCb != null) {
            kafkaConsumer.subscribe(arrayList, rebalanceCb);
        } else {
            kafkaConsumer.subscribe(arrayList);
        }
        try {
            Thread.sleep(150L);
        } catch (Exception e) {
            System.out.println(e);
        }
        if (rebalanceCb != null) {
            rebalanceCb.assignDone();
        }
        Assert.assertTrue(kafkaConsumer.assignment().size() == numParts);
        kafkaConsumer.close();
    }

    private void runRegexTestWithSingleListener(KafkaConsumer kafkaConsumer, String str, RebalanceCb rebalanceCb) throws IOException {
        kafkaConsumer.subscribe(Pattern.compile(str + ":topic.*$"), rebalanceCb);
        try {
            Thread.sleep(150L);
        } catch (Exception e) {
            System.out.println(e);
        }
        Set assignment = kafkaConsumer.assignment();
        Iterator it = assignment.iterator();
        while (it.hasNext()) {
            System.out.println("SHOULD NOT SEE ANY STREAMS " + it.next());
        }
        Assert.assertTrue(assignment.size() == 0);
        madmin.createTopic(str, "topic1", numParts);
        madmin.createTopic(str, "topic", numParts);
        madmin.createTopic(str, "topi", numParts);
        madmin.createTopic(str, "abcd", numParts);
        madmin.createTopic(str, "ab", numParts);
        madmin.createTopic(str, "007topic", numParts);
        madmin.createTopic(str, "1234567", numParts);
        madmin.createTopic(str, "234567", numParts);
        try {
            Thread.sleep(150L);
        } catch (Exception e2) {
            System.out.println(e2);
        }
        if (rebalanceCb != null) {
            rebalanceCb.assignDone();
        }
        Set assignment2 = kafkaConsumer.assignment();
        Iterator it2 = assignment2.iterator();
        while (it2.hasNext()) {
            System.out.println("subcribe " + str + " :topic.*$ " + it2.next());
        }
        Assert.assertTrue(assignment2.size() == 2);
        kafkaConsumer.subscribe(Pattern.compile(str + ":ab+$"), rebalanceCb);
        try {
            Thread.sleep(150L);
        } catch (Exception e3) {
            System.out.println(e3);
        }
        if (rebalanceCb != null) {
            rebalanceCb.revokeDone();
        }
        try {
            Thread.sleep(150L);
        } catch (Exception e4) {
            System.out.println(e4);
        }
        if (rebalanceCb != null) {
            rebalanceCb.assignDone();
        }
        Set assignment3 = kafkaConsumer.assignment();
        Iterator it3 = assignment3.iterator();
        while (it3.hasNext()) {
            System.out.println("subscribe " + str + ":ab+$ " + it3.next());
        }
        Assert.assertTrue(assignment3.size() == numParts);
        kafkaConsumer.subscribe(Pattern.compile(str + ":[0,1]+.*"), rebalanceCb);
        try {
            Thread.sleep(150L);
        } catch (Exception e5) {
            System.out.println(e5);
        }
        if (rebalanceCb != null) {
            rebalanceCb.revokeDone();
        }
        try {
            Thread.sleep(150L);
        } catch (Exception e6) {
            System.out.println(e6);
        }
        if (rebalanceCb != null) {
            rebalanceCb.assignDone();
        }
        Set assignment4 = kafkaConsumer.assignment();
        Iterator it4 = assignment4.iterator();
        while (it4.hasNext()) {
            System.out.println("subscribe " + str + ":[0,1]+ " + it4.next());
        }
        Assert.assertTrue(assignment4.size() == 2);
        madmin.deleteTopic(str, "topic1");
        madmin.deleteTopic(str, "007topic");
        try {
            Thread.sleep(150L);
        } catch (Exception e7) {
            System.out.println(e7);
        }
        if (rebalanceCb != null) {
            rebalanceCb.revokeDone();
        }
        Set assignment5 = kafkaConsumer.assignment();
        Iterator it5 = assignment5.iterator();
        while (it5.hasNext()) {
            System.out.println("deleted topic1 and 007topic" + it5.next());
        }
        Assert.assertTrue(assignment5.size() == numParts);
        kafkaConsumer.unsubscribe();
        try {
            Thread.sleep(150L);
        } catch (Exception e8) {
            System.out.println(e8);
        }
        if (rebalanceCb != null) {
            rebalanceCb.revokeDone();
        }
        Set assignment6 = kafkaConsumer.assignment();
        Iterator it6 = assignment6.iterator();
        while (it6.hasNext()) {
            System.out.println("unsubscrbe " + it6.next());
        }
        Assert.assertTrue(assignment6.size() == 0);
        kafkaConsumer.close();
    }
}
