package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/ProducerAndListenerDefaultStream.class */
public class ProducerAndListenerDefaultStream extends BaseTest {
    private static final String DEFAULTSTREAM = "/jtest-DEFAULTSTREAM";
    private static final String DEFAULTSTREAMLISTTOPIC = "/jtest-DEFAULTSTREAM-listtopic";
    private static Admin madmin;
    private static final int numParts = 1;
    private static final Logger _logger = LoggerFactory.getLogger(ProducerAndListenerDefaultStream.class);
    private static final String STREAM = "/jtest-" + ProducerAndListenerDefaultStream.class.getSimpleName();
    private static final String STREAMLISTTOPIC = STREAM + "-listtopic";

    /* loaded from: input_file:com/mapr/streams/tests/listener/ProducerAndListenerDefaultStream$RebalanceCb.class */
    public final class RebalanceCb implements ConsumerRebalanceListener {
        private boolean revoked = false;
        private boolean assigned = false;

        public RebalanceCb() {
        }

        public synchronized void clear() {
            this.revoked = false;
            this.assigned = false;
        }

        public synchronized void revokeDone() {
            while (!this.revoked) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
            this.revoked = false;
        }

        public synchronized void assignDone() {
            while (!this.assigned) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
            this.assigned = false;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            synchronized (this) {
                this.assigned = true;
                notifyAll();
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            synchronized (this) {
                this.revoked = true;
                notifyAll();
            }
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
        try {
            madmin.deleteStream(DEFAULTSTREAM);
        } catch (Exception e2) {
        }
        try {
            madmin.deleteStream(STREAMLISTTOPIC);
        } catch (Exception e3) {
        }
        try {
            madmin.deleteStream(DEFAULTSTREAMLISTTOPIC);
        } catch (Exception e4) {
        }
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        madmin.createStream(STREAM, newStreamDescriptor);
        madmin.createStream(DEFAULTSTREAM, newStreamDescriptor);
        madmin.createStream(STREAMLISTTOPIC, newStreamDescriptor);
        madmin.createStream(DEFAULTSTREAMLISTTOPIC, newStreamDescriptor);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        madmin.deleteStream(STREAM);
        madmin.deleteStream(DEFAULTSTREAM);
        madmin.deleteStream(STREAMLISTTOPIC);
        madmin.deleteStream(DEFAULTSTREAMLISTTOPIC);
    }

    public void testProducerAndConsumer(int i, String str, KafkaProducer kafkaProducer, KafkaConsumer kafkaConsumer) throws Exception {
        Future[] futureArr = new Future[i];
        Future[] futureArr2 = new Future[i];
        for (int i2 = 0; i2 < i; i2 += numParts) {
            String str2 = "key-value" + i2;
            String str3 = "msg-value" + i2;
            futureArr2[i2] = kafkaProducer.send(new ProducerRecord(STREAM + ":" + str, str2.getBytes(), str3.getBytes()));
            futureArr[i2] = kafkaProducer.send(new ProducerRecord(str, str2.getBytes(), str3.getBytes()));
        }
        kafkaProducer.flush();
        kafkaProducer.close();
        long j = -1;
        int length = futureArr.length;
        for (int i3 = 0; i3 < length; i3 += numParts) {
            RecordMetadata recordMetadata = (RecordMetadata) futureArr[i3].get();
            Assert.assertTrue(recordMetadata.partition() == 0);
            Assert.assertTrue(j < recordMetadata.offset());
            j = recordMetadata.offset();
            Assert.assertTrue(recordMetadata.topic().equals("/jtest-DEFAULTSTREAM:" + str));
        }
        long j2 = -1;
        int length2 = futureArr2.length;
        for (int i4 = 0; i4 < length2; i4 += numParts) {
            RecordMetadata recordMetadata2 = (RecordMetadata) futureArr2[i4].get();
            Assert.assertTrue(recordMetadata2.partition() == 0);
            Assert.assertTrue(j2 < recordMetadata2.offset());
            j2 = recordMetadata2.offset();
            Assert.assertTrue(recordMetadata2.topic().equals(STREAM + ":" + str));
        }
        if (kafkaConsumer == null) {
            return;
        }
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(str);
        arrayList.add(STREAM + ":" + str);
        RebalanceCb rebalanceCb = new RebalanceCb();
        kafkaConsumer.subscribe(arrayList, rebalanceCb);
        rebalanceCb.assignDone();
        boolean z = false;
        int i5 = 0;
        int i6 = 0;
        while (!z) {
            ConsumerRecords poll = kafkaConsumer.poll(1000L);
            for (ConsumerRecord consumerRecord : poll.records(new TopicPartition("/jtest-DEFAULTSTREAM:" + str, 0))) {
                Assert.assertTrue(consumerRecord.partition() == 0);
                Assert.assertTrue(consumerRecord.topic().equals("/jtest-DEFAULTSTREAM:" + str));
                i5 += numParts;
            }
            for (ConsumerRecord consumerRecord2 : poll.records(new TopicPartition(STREAM + ":" + str, 0))) {
                Assert.assertTrue(consumerRecord2.partition() == 0);
                Assert.assertTrue(consumerRecord2.topic().equals(STREAM + ":" + str));
                i6 += numParts;
            }
            if (i5 == i && i6 == i) {
                z = numParts;
            }
        }
        kafkaConsumer.close();
    }

    @Test
    public void testDefaultStreamNameForProducer() throws Exception {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(defaultInstance.getParallelFlushersPerPartition(), "false");
        properties.put("streams.producer.default.stream", DEFAULTSTREAM);
        testProducerAndConsumer(10, "producertest", new KafkaProducer(properties), null);
    }

    @Test
    public void testDefaultStreamNameForConsumer() throws Exception {
        madmin.createTopic(DEFAULTSTREAM, "consumertest");
        madmin.createTopic(STREAM, "consumertest");
        Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("streams.consumer.default.stream", DEFAULTSTREAM);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        ArrayList arrayList = new ArrayList(2);
        arrayList.add("consumertest");
        arrayList.add(STREAM + ":" + "consumertest");
        RebalanceCb rebalanceCb = new RebalanceCb();
        kafkaConsumer.subscribe(arrayList, rebalanceCb);
        rebalanceCb.assignDone();
        Set subscription = kafkaConsumer.subscription();
        Set assignment = kafkaConsumer.assignment();
        HashSet hashSet = new HashSet();
        hashSet.add("/jtest-DEFAULTSTREAM:" + "consumertest");
        hashSet.add(STREAM + ":" + "consumertest");
        HashSet hashSet2 = new HashSet();
        hashSet2.add(new TopicPartition("/jtest-DEFAULTSTREAM:" + "consumertest", 0));
        hashSet2.add(new TopicPartition(STREAM + ":" + "consumertest", 0));
        Assert.assertTrue(subscription.size() == assignment.size());
        Assert.assertTrue(hashSet.equals(subscription));
        Assert.assertTrue(hashSet2.equals(assignment));
        Assert.assertTrue(kafkaConsumer.position(new TopicPartition("consumertest", 0)) == 0);
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("consumertest", 0), new OffsetAndMetadata(100L));
        hashMap.put(new TopicPartition(STREAM + ":" + "consumertest", 0), new OffsetAndMetadata(200L));
        kafkaConsumer.commitSync(hashMap);
        Assert.assertTrue(kafkaConsumer.committed(new TopicPartition("consumertest", 0)).offset() == 100);
        Assert.assertTrue(kafkaConsumer.committed(new TopicPartition("/jtest-DEFAULTSTREAM:" + "consumertest", 0)).offset() == 100);
        Assert.assertTrue(kafkaConsumer.committed(new TopicPartition(STREAM + ":" + "consumertest", 0)).offset() == 200);
        kafkaConsumer.unsubscribe();
        ArrayList arrayList2 = new ArrayList(2);
        arrayList2.add(new TopicPartition("consumertest", 0));
        arrayList2.add(new TopicPartition(STREAM + ":" + "consumertest", 0));
        kafkaConsumer.assign(arrayList2);
        Set subscription2 = kafkaConsumer.subscription();
        Set assignment2 = kafkaConsumer.assignment();
        Assert.assertTrue(subscription2.isEmpty());
        Assert.assertTrue(assignment2.size() == 2);
        Assert.assertTrue(hashSet2.equals(assignment2));
        kafkaConsumer.close();
    }

    @Test
    public void testDefaultStreamNameForProducerAndConsumer() throws Exception {
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(defaultInstance.getParallelFlushersPerPartition(), "false");
        properties.put("streams.producer.default.stream", DEFAULTSTREAM);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("streams.consumer.default.stream", DEFAULTSTREAM);
        testProducerAndConsumer(10, "listenerPollTest", kafkaProducer, new KafkaConsumer(properties2));
    }

    @Test
    public void testListTopicsForConsumer() throws Exception {
        for (int i = 0; i < 31; i += numParts) {
            madmin.createTopic(DEFAULTSTREAMLISTTOPIC, "list" + i, i + numParts);
            madmin.createTopic(STREAMLISTTOPIC, "list" + i, i + numParts);
        }
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", "earliest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Map listTopics = kafkaConsumer.listTopics();
        Assert.assertTrue(listTopics.size() == 0);
        Map listTopics2 = kafkaConsumer.listTopics(STREAMLISTTOPIC);
        Assert.assertTrue(listTopics2.size() == 31);
        String str = STREAMLISTTOPIC + ":" + "list";
        HashSet hashSet = new HashSet();
        for (Map.Entry entry : listTopics2.entrySet()) {
            String str2 = (String) entry.getKey();
            Assert.assertTrue(str2.startsWith(str));
            int parseInt = Integer.parseInt(str2.substring(str.length()));
            hashSet.add(Integer.valueOf(parseInt));
            Assert.assertTrue(((List) entry.getValue()).size() == parseInt + numParts);
        }
        Assert.assertTrue(hashSet.size() == 31);
        Map listTopics3 = kafkaConsumer.listTopics(DEFAULTSTREAMLISTTOPIC);
        Assert.assertTrue(listTopics3.size() == 31);
        String str3 = "/jtest-DEFAULTSTREAM-listtopic:" + "list";
        HashSet hashSet2 = new HashSet();
        for (Map.Entry entry2 : listTopics3.entrySet()) {
            String str4 = (String) entry2.getKey();
            Assert.assertTrue(str4.startsWith(str3));
            int parseInt2 = Integer.parseInt(str4.substring(str3.length()));
            hashSet2.add(Integer.valueOf(parseInt2));
            Assert.assertTrue(((List) entry2.getValue()).size() == parseInt2 + numParts);
        }
        Assert.assertTrue(hashSet2.size() == 31);
        kafkaConsumer.listTopics(Pattern.compile("listA.*"));
        Assert.assertTrue(listTopics.size() == 0);
        kafkaConsumer.listTopics(Pattern.compile(STREAMLISTTOPIC + ":" + "listA.*"));
        Assert.assertTrue(listTopics.size() == 0);
        Assert.assertTrue(kafkaConsumer.listTopics(Pattern.compile("list.$")).size() == 0);
        Map listTopics4 = kafkaConsumer.listTopics(Pattern.compile(STREAMLISTTOPIC + ":" + "list.$"));
        Assert.assertTrue(listTopics4.size() == 10);
        String str5 = STREAMLISTTOPIC + ":" + "list";
        HashSet hashSet3 = new HashSet();
        for (Map.Entry entry3 : listTopics4.entrySet()) {
            String str6 = (String) entry3.getKey();
            Assert.assertTrue(str6.startsWith(str5));
            int parseInt3 = Integer.parseInt(str6.substring(str5.length()));
            hashSet3.add(Integer.valueOf(parseInt3));
            Assert.assertTrue(((List) entry3.getValue()).size() == parseInt3 + numParts);
        }
        Assert.assertTrue(hashSet3.size() == 10);
        kafkaConsumer.close();
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("streams.consumer.default.stream", DEFAULTSTREAMLISTTOPIC);
        KafkaConsumer kafkaConsumer2 = new KafkaConsumer(properties2);
        Assert.assertTrue(kafkaConsumer2.listTopics().size() == 31);
        Map listTopics5 = kafkaConsumer2.listTopics((String) null);
        Assert.assertTrue(listTopics5.size() == 31);
        HashSet hashSet4 = new HashSet();
        String str7 = "/jtest-DEFAULTSTREAM-listtopic:" + "list";
        for (Map.Entry entry4 : listTopics5.entrySet()) {
            String str8 = (String) entry4.getKey();
            Assert.assertTrue(str8.startsWith(str7));
            int parseInt4 = Integer.parseInt(str8.substring(str7.length()));
            hashSet4.add(Integer.valueOf(parseInt4));
            Assert.assertTrue(((List) entry4.getValue()).size() == parseInt4 + numParts);
        }
        Assert.assertTrue(hashSet4.size() == 31);
        Map listTopics6 = kafkaConsumer2.listTopics(DEFAULTSTREAMLISTTOPIC);
        Assert.assertTrue(listTopics6.size() == 31);
        String str9 = "/jtest-DEFAULTSTREAM-listtopic:" + "list";
        HashSet hashSet5 = new HashSet();
        for (Map.Entry entry5 : listTopics6.entrySet()) {
            String str10 = (String) entry5.getKey();
            Assert.assertTrue(str10.startsWith(str9));
            int parseInt5 = Integer.parseInt(str10.substring(str9.length()));
            hashSet5.add(Integer.valueOf(parseInt5));
            Assert.assertTrue(((List) entry5.getValue()).size() == parseInt5 + numParts);
        }
        Assert.assertTrue(hashSet5.size() == 31);
        Map listTopics7 = kafkaConsumer2.listTopics(STREAMLISTTOPIC);
        Assert.assertTrue(listTopics7.size() == 31);
        String str11 = STREAMLISTTOPIC + ":" + "list";
        HashSet hashSet6 = new HashSet();
        for (Map.Entry entry6 : listTopics7.entrySet()) {
            String str12 = (String) entry6.getKey();
            Assert.assertTrue(str12.startsWith(str11));
            int parseInt6 = Integer.parseInt(str12.substring(str11.length()));
            hashSet6.add(Integer.valueOf(parseInt6));
            Assert.assertTrue(((List) entry6.getValue()).size() == parseInt6 + numParts);
        }
        Assert.assertTrue(hashSet6.size() == 31);
        Assert.assertTrue(kafkaConsumer2.listTopics(Pattern.compile("listA.*")).size() == 0);
        Assert.assertTrue(kafkaConsumer2.listTopics(Pattern.compile(STREAMLISTTOPIC + ":" + "listA.*")).size() == 0);
        Map listTopics8 = kafkaConsumer2.listTopics(Pattern.compile("list.$"));
        Assert.assertTrue(listTopics8.size() == 10);
        String str13 = "/jtest-DEFAULTSTREAM-listtopic:" + "list";
        HashSet hashSet7 = new HashSet();
        for (Map.Entry entry7 : listTopics8.entrySet()) {
            String str14 = (String) entry7.getKey();
            Assert.assertTrue(str14.startsWith(str13));
            int parseInt7 = Integer.parseInt(str14.substring(str13.length()));
            hashSet7.add(Integer.valueOf(parseInt7));
            Assert.assertTrue(((List) entry7.getValue()).size() == parseInt7 + numParts);
        }
        Assert.assertTrue(hashSet7.size() == 10);
        Map listTopics9 = kafkaConsumer2.listTopics(Pattern.compile(STREAMLISTTOPIC + ":" + "list.$"));
        Assert.assertTrue(listTopics9.size() == 10);
        String str15 = STREAMLISTTOPIC + ":" + "list";
        HashSet hashSet8 = new HashSet();
        for (Map.Entry entry8 : listTopics9.entrySet()) {
            String str16 = (String) entry8.getKey();
            Assert.assertTrue(str16.startsWith(str15));
            int parseInt8 = Integer.parseInt(str16.substring(str15.length()));
            hashSet8.add(Integer.valueOf(parseInt8));
            Assert.assertTrue(((List) entry8.getValue()).size() == parseInt8 + numParts);
        }
        Assert.assertTrue(hashSet8.size() == 10);
        kafkaConsumer2.close();
    }
}
