package com.mapr.streams.tests.listener;

import com.mapr.db.TableDescriptor;
import com.mapr.db.impl.MapRDBImpl;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.TimestampType;
import com.mapr.streams.TopicDescriptor;
import com.mapr.streams.listener.Listener;
import com.mapr.streams.listener.ListenerV10;
import com.mapr.streams.producer.Producer;
import com.mapr.streams.tests.producer.ProducerMultiTest;
import com.mapr.streams.tests.producer.SendMessagesToProducer;
import com.mapr.streams.tests.producer.SendTimedMessagesToProducer;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/ListenerV10Test.class */
public class ListenerV10Test extends BaseTest {
    private static final Logger _logger;
    private static final String PREFIX;
    private static Admin madmin;
    private static final int numParts = 4;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/mapr/streams/tests/listener/ListenerV10Test$CommitCb.class */
    public final class CommitCb implements OffsetCommitCallback {
        private boolean committed = false;
        private Map<TopicPartition, OffsetAndMetadata> committedOffsets = null;
        private Exception committedException = null;

        public CommitCb() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            synchronized (this) {
                this.committedOffsets = map;
                this.committedException = exc;
                this.committed = true;
                notifyAll();
            }
        }

        public synchronized void commitDone() {
            while (!this.committed) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
        }

        public Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets() {
            return this.committedOffsets;
        }

        public Exception getException() {
            return this.committedException;
        }
    }

    @BeforeClass
    public static void setupTest() throws Exception {
        madmin = Streams.newAdmin(new Configuration());
        try {
            madmin.deleteStream(PREFIX + "pollMaxRecords");
        } catch (Exception e) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenertimedmessages");
        } catch (Exception e2) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerlogtimedmessages");
        } catch (Exception e3) {
        }
        try {
            madmin.deleteStream(PREFIX + "listeneredittimedmessages");
        } catch (Exception e4) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerinterceptoronconsume");
        } catch (Exception e5) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerinterceptorasync");
        } catch (Exception e6) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerinterceptorautocommit");
        } catch (Exception e7) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerinterceptorasynccb");
        } catch (Exception e8) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerinterceptorasynccbpart");
        } catch (Exception e9) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerinterceptorsync");
        } catch (Exception e10) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerinterceptorsyncpart");
        } catch (Exception e11) {
        }
        try {
            madmin.deleteStream((PREFIX + "pollmaxfetchsize") + 0);
        } catch (Exception e12) {
        }
        try {
            madmin.deleteStream((PREFIX + "pollMaxRecords") + 0);
        } catch (Exception e13) {
        }
        try {
            madmin.deleteStream(PREFIX + "headers");
        } catch (Exception e14) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenertimeindex");
        } catch (Exception e15) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenertimeindexmultiplefeeds");
        } catch (Exception e16) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenertimedmessages");
        } catch (Exception e17) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerlogtimedmessages");
        } catch (Exception e18) {
        }
        try {
            madmin.deleteStream(PREFIX + "listeneredittimedmessages");
        } catch (Exception e19) {
        }
        try {
            madmin.deleteStream(PREFIX + "listenerpaused");
        } catch (Exception e20) {
        }
        try {
            madmin.deleteStream(PREFIX + "endoffsetssingle");
        } catch (Exception e21) {
        }
        try {
            madmin.deleteStream(PREFIX + "endoffsetsmultitopics");
        } catch (Exception e22) {
        }
        try {
            madmin.deleteStream(PREFIX + "endoffsetstopictimeout");
        } catch (Exception e23) {
        }
        try {
            madmin.deleteStream(PREFIX + "endoffsetsparttimeout");
        } catch (Exception e24) {
        }
        try {
            madmin.deleteStream(PREFIX + "endoffsetsmultistreams1");
        } catch (Exception e25) {
        }
        try {
            madmin.deleteStream(PREFIX + "endoffsetsmultistreams2");
        } catch (Exception e26) {
        }
        try {
            madmin.deleteStream(PREFIX + "endoffsetsnostream");
        } catch (Exception e27) {
        }
        try {
            madmin.deleteStream(PREFIX + "begoffsetssingle");
        } catch (Exception e28) {
        }
        try {
            madmin.deleteStream(PREFIX + "begoffsetsmultitopics");
        } catch (Exception e29) {
        }
        try {
            madmin.deleteStream(PREFIX + "begoffsetsmultistreams1");
        } catch (Exception e30) {
        }
        try {
            madmin.deleteStream(PREFIX + "begoffsetsmultistreams2");
        } catch (Exception e31) {
        }
        try {
            madmin.deleteStream(PREFIX + "closewithtimeout");
        } catch (Exception e32) {
        }
        try {
            madmin.deleteStream(PREFIX + "closewithlargetimeout");
        } catch (Exception e33) {
        }
    }

    @Test
    public void testCloseWithLargeTimeout() throws Exception {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setCompressionAlgo("off");
        String str = PREFIX + "closewithlargetimeout";
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", numParts);
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", true);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        byte[] bArr = new byte[10];
        byte[] bArr2 = new byte[10];
        String str2 = str + ":t";
        for (int i = 0; i < 10000; i++) {
            for (int i2 = 0; i2 < numParts; i2++) {
                kafkaProducer.send(new ProducerRecord(str2, Integer.valueOf(i2), bArr, bArr2));
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        kafkaProducer.close(50L, TimeUnit.SECONDS);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis < 2000);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str2);
        kafkaConsumer.subscribe(arrayList);
        int i3 = 0;
        Assert.assertTrue(kafkaConsumer.assignment().size() == numParts);
        for (int i4 = 0; i4 < numParts; i4++) {
            i3 += kafkaConsumer.poll(1000L).count();
        }
        kafkaConsumer.unsubscribe();
        long currentTimeMillis2 = System.currentTimeMillis();
        kafkaConsumer.close(50L, TimeUnit.SECONDS);
        Assert.assertTrue(System.currentTimeMillis() - currentTimeMillis2 < 2000);
        madmin.deleteStream(str);
    }

    @Test
    public void testCloseWithTimeout() throws Exception {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setCompressionAlgo("off");
        String str = PREFIX + "closewithtimeout";
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", numParts);
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", true);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
        byte[] bArr = new byte[10];
        byte[] bArr2 = new byte[10];
        String str2 = str + ":t";
        for (int i = 0; i < 10000; i++) {
            for (int i2 = 0; i2 < numParts; i2++) {
                kafkaProducer.send(new ProducerRecord(str2, Integer.valueOf(i2), bArr, bArr2));
            }
        }
        kafkaProducer.close(1L, TimeUnit.MILLISECONDS);
        ArrayList arrayList = new ArrayList();
        arrayList.add(str2);
        kafkaConsumer.subscribe(arrayList);
        int i3 = 0;
        Assert.assertTrue(kafkaConsumer.assignment().size() == numParts);
        for (int i4 = 0; i4 < numParts; i4++) {
            i3 += kafkaConsumer.poll(1000L).count();
        }
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close(1L, TimeUnit.MILLISECONDS);
        madmin.deleteStream(str);
    }

    @Test
    public void testBegOffsetsMultipleStreams() throws IOException {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        String str = PREFIX + "begoffsetsmultistreams1";
        madmin.createStream(str, newStreamDescriptor);
        String str2 = PREFIX + "begoffsetsmultistreams2";
        madmin.createStream(str2, newStreamDescriptor);
        madmin.createTopic(str, "t1", numParts);
        madmin.createTopic(str2, "t2", numParts);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaProducer<byte[], byte[]> producer2 = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        int[] iArr = {256, 128};
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(iArr[0] * numParts), str + ":t1", numParts, iArr[0]).run();
        new SendMessagesToProducer(producer2, new ProducerMultiTest.CountCallback(iArr[1] * numParts), str2 + ":t2", numParts, iArr[1]).run();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < numParts; i++) {
            arrayList.add(new TopicPartition(str + ":t1", i));
        }
        for (int i2 = 3; i2 >= 0; i2--) {
            arrayList.add(new TopicPartition(str2 + ":t2", i2));
        }
        Map beginningOffsets = consumer.beginningOffsets(arrayList);
        TopicPartition[] topicPartitionArr = new TopicPartition[8];
        arrayList.toArray(topicPartitionArr);
        System.err.println("out size: " + beginningOffsets.size());
        for (int i3 = 0; i3 < 2; i3++) {
            for (int i4 = 0; i4 < numParts; i4++) {
                System.err.println("numMsgs : " + iArr + " out Offset : " + beginningOffsets.get(topicPartitionArr[i3]) + " index : " + i3);
                Assert.assertTrue(((Long) beginningOffsets.get(topicPartitionArr[i4 + (i3 * numParts)])).longValue() == 0);
            }
        }
        madmin.deleteStream(str);
        madmin.deleteStream(str2);
    }

    @Test
    public void testBegOffsetsMultipleTopics() throws IOException {
        String str = PREFIX + "begoffsetsmultitopics";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t1", numParts);
        madmin.createTopic(str, "t2", numParts);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaProducer<byte[], byte[]> producer2 = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        int[] iArr = {256, 128};
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(iArr[0] * numParts), str + ":t1", numParts, iArr[0]).run();
        new SendMessagesToProducer(producer2, new ProducerMultiTest.CountCallback(iArr[1] * numParts), str + ":t2", numParts, iArr[1]).run();
        ArrayList arrayList = new ArrayList();
        for (int i = 3; i >= 0; i--) {
            arrayList.add(new TopicPartition(str + ":t1", i));
        }
        for (int i2 = 0; i2 < numParts; i2++) {
            arrayList.add(new TopicPartition(str + ":t2", i2));
        }
        Map beginningOffsets = consumer.beginningOffsets(arrayList);
        TopicPartition[] topicPartitionArr = new TopicPartition[8];
        arrayList.toArray(topicPartitionArr);
        for (int i3 = 0; i3 < 2; i3++) {
            for (int i4 = 0; i4 < numParts; i4++) {
                System.err.println("numMsgs : " + iArr + " out Offset : " + beginningOffsets.get(topicPartitionArr[i3]) + " index : " + i3);
                Assert.assertTrue(((Long) beginningOffsets.get(topicPartitionArr[i4 + (i3 * numParts)])).longValue() == 0);
            }
        }
        madmin.deleteStream(str);
    }

    @Test
    public void testBegOffsetsSingleTopic() throws IOException {
        String str = PREFIX + "begoffsetssingle";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * numParts), str + ":t", numParts, 256).run();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < numParts; i++) {
            arrayList.add(new TopicPartition(str + ":t", i));
        }
        Map beginningOffsets = consumer.beginningOffsets(arrayList);
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        arrayList.toArray(topicPartitionArr);
        for (int i2 = 0; i2 < numParts; i2++) {
            System.err.println("numMsgs : 256 out Offset : " + beginningOffsets.get(topicPartitionArr[i2]) + " index : " + i2);
            Assert.assertTrue(((Long) beginningOffsets.get(topicPartitionArr[i2])).longValue() == 0);
        }
        madmin.createTopic(str, "t2", numParts);
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < numParts; i3++) {
            arrayList2.add(new TopicPartition(str + ":t2", i3));
        }
        Map beginningOffsets2 = consumer.beginningOffsets(arrayList2);
        arrayList2.toArray(topicPartitionArr);
        for (int i4 = 0; i4 < numParts; i4++) {
            System.err.println("out Offset : " + beginningOffsets2.get(topicPartitionArr[i4]) + " index : " + i4);
            Assert.assertTrue(((Long) beginningOffsets2.get(topicPartitionArr[i4])).longValue() == 0);
        }
        madmin.deleteStream(str);
    }

    @Test
    public void testEndOffsetsNoPartitions() throws IOException {
        String str = PREFIX + "endoffsetssingle";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(128 * numParts), str + ":t", numParts, 128).run();
        Assert.assertTrue(consumer.endOffsets(new ArrayList()).size() == 0);
        madmin.deleteStream(str);
    }

    @Test
    public void testEndOffsetsNoStream() throws IOException {
        String str = PREFIX + "endoffsetsnostream";
        Streams.newStreamDescriptor().setTimeToLiveSec(0L);
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition(str + ":t", 0));
        Exception exc = null;
        try {
            consumer.endOffsets(arrayList);
            _logger.info("Test completed without errors !! It was expected to throw exception");
        } catch (Exception e) {
            _logger.info("Hit exception " + e);
            exc = e;
        }
        Assert.assertTrue(exc instanceof IllegalArgumentException);
    }

    @Test
    public void testEndOffsetsPartitionTimeout() throws Exception {
        final String str = PREFIX + "endoffsetsparttimeout";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        madmin.createTopic(str, "t", numParts);
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * numParts), str + ":t", numParts, 256).run();
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("auto.offset.reset", "earliest");
        properties.put("streams.rpc.timeout.ms", "30000");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 6; i++) {
            arrayList.add(new TopicPartition(str + ":t", i));
        }
        Exception exc = null;
        try {
            kafkaConsumer.endOffsets(arrayList);
            _logger.info("Test completed without errors !! It was expected to throw exception");
        } catch (Exception e) {
            _logger.info("Hit exception " + e);
            exc = e;
        }
        Assert.assertTrue(exc instanceof TimeoutException);
        Thread thread = new Thread(new Runnable() { // from class: com.mapr.streams.tests.listener.ListenerV10Test.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(5000L);
                    ListenerV10Test.madmin.editTopic(str, "t", 6);
                } catch (IOException | InterruptedException e2) {
                }
            }
        });
        thread.start();
        Exception exc2 = null;
        Map map = null;
        try {
            map = kafkaConsumer.endOffsets(arrayList);
        } catch (Exception e2) {
            exc2 = e2;
        }
        Assert.assertTrue(exc2 == null);
        TopicPartition[] topicPartitionArr = new TopicPartition[6];
        arrayList.toArray(topicPartitionArr);
        for (int i2 = 0; i2 < numParts; i2++) {
            System.err.println("numMsgs : 256 end Offset : " + map.get(topicPartitionArr[i2]) + " index : " + i2);
            Assert.assertTrue(((Long) map.get(topicPartitionArr[i2])).longValue() == ((long) 256));
        }
        for (int i3 = numParts; i3 < 6; i3++) {
            System.err.println(" End Offset : " + map.get(topicPartitionArr[i3]) + " index : " + i3);
            Assert.assertTrue(((Long) map.get(topicPartitionArr[i3])).longValue() == 0);
        }
        thread.join();
        madmin.deleteStream(str);
    }

    @Test
    public void testEndOffsetsTopicTimeout() throws Exception {
        Exception exc;
        final String str = PREFIX + "endoffsetstopictimeout";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("auto.offset.reset", "earliest");
        properties.put("streams.rpc.timeout.ms", "30000");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new TopicPartition(str + ":t", 0));
        try {
            exc = null;
            kafkaConsumer.endOffsets(arrayList);
            _logger.info("Test completed without errors !! It was expected to throw exception");
        } catch (Exception e) {
            _logger.info("Hit exception " + e);
            exc = e;
        }
        Assert.assertTrue(exc instanceof TimeoutException);
        Thread thread = new Thread(new Runnable() { // from class: com.mapr.streams.tests.listener.ListenerV10Test.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(5000L);
                    ListenerV10Test.madmin.createTopic(str, "t", 1);
                } catch (IOException | InterruptedException e2) {
                }
            }
        });
        thread.start();
        Exception exc2 = null;
        Map map = null;
        try {
            map = kafkaConsumer.endOffsets(arrayList);
        } catch (Exception e2) {
            exc2 = e2;
        }
        Assert.assertTrue(exc2 == null);
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        arrayList.toArray(topicPartitionArr);
        System.err.println(" end Offset : " + map.get(topicPartitionArr[0]) + " index : 0");
        Assert.assertTrue(((Long) map.get(topicPartitionArr[0])).longValue() == 0);
        thread.join();
        madmin.deleteStream(str);
    }

    @Test
    public void testEndOffsetsMultipleStreams() throws IOException {
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        String str = PREFIX + "endoffsetsmultistreams1";
        madmin.createStream(str, newStreamDescriptor);
        String str2 = PREFIX + "endoffsetsmultistreams2";
        madmin.createStream(str2, newStreamDescriptor);
        madmin.createTopic(str, "t1", numParts);
        madmin.createTopic(str2, "t2", numParts);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaProducer<byte[], byte[]> producer2 = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        int[] iArr = {256, 128};
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(iArr[0] * numParts), str + ":t1", numParts, iArr[0]).run();
        new SendMessagesToProducer(producer2, new ProducerMultiTest.CountCallback(iArr[1] * numParts), str2 + ":t2", numParts, iArr[1]).run();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < numParts; i++) {
            arrayList.add(new TopicPartition(str + ":t1", i));
        }
        for (int i2 = 3; i2 >= 0; i2--) {
            arrayList.add(new TopicPartition(str2 + ":t2", i2));
        }
        Map endOffsets = consumer.endOffsets(arrayList);
        TopicPartition[] topicPartitionArr = new TopicPartition[8];
        arrayList.toArray(topicPartitionArr);
        System.err.println("out size: " + endOffsets.size());
        for (int i3 = 0; i3 < 2; i3++) {
            for (int i4 = 0; i4 < numParts; i4++) {
                System.err.println("numMsgs : " + iArr + " out Offset : " + endOffsets.get(topicPartitionArr[i3]) + " index : " + i3);
                Assert.assertTrue(((Long) endOffsets.get(topicPartitionArr[i4 + (i3 * numParts)])).longValue() == ((long) iArr[i3]));
            }
        }
        madmin.deleteStream(str);
        madmin.deleteStream(str2);
    }

    @Test
    public void testEndOffsetsMultipleTopics() throws IOException {
        String str = PREFIX + "endoffsetsmultitopics";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t1", numParts);
        madmin.createTopic(str, "t2", numParts);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaProducer<byte[], byte[]> producer2 = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        int[] iArr = {256, 128};
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(iArr[0] * numParts), str + ":t1", numParts, iArr[0]).run();
        new SendMessagesToProducer(producer2, new ProducerMultiTest.CountCallback(iArr[1] * numParts), str + ":t2", numParts, iArr[1]).run();
        ArrayList arrayList = new ArrayList();
        for (int i = 3; i >= 0; i--) {
            arrayList.add(new TopicPartition(str + ":t1", i));
        }
        for (int i2 = 0; i2 < numParts; i2++) {
            arrayList.add(new TopicPartition(str + ":t2", i2));
        }
        Map endOffsets = consumer.endOffsets(arrayList);
        TopicPartition[] topicPartitionArr = new TopicPartition[8];
        arrayList.toArray(topicPartitionArr);
        for (int i3 = 0; i3 < 2; i3++) {
            for (int i4 = 0; i4 < numParts; i4++) {
                System.err.println("numMsgs : " + iArr + " out Offset : " + endOffsets.get(topicPartitionArr[i3]) + " index : " + i3);
                Assert.assertTrue(((Long) endOffsets.get(topicPartitionArr[i4 + (i3 * numParts)])).longValue() == ((long) iArr[i3]));
            }
        }
        madmin.deleteStream(str);
    }

    @Test
    public void testEndOffsetsSingleTopic() throws IOException {
        String str = PREFIX + "endoffsetssingle";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        new SendMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * numParts), str + ":t", numParts, 256).run();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < numParts; i++) {
            arrayList.add(new TopicPartition(str + ":t", i));
        }
        Map endOffsets = consumer.endOffsets(arrayList);
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        arrayList.toArray(topicPartitionArr);
        for (int i2 = 0; i2 < numParts; i2++) {
            System.err.println("numMsgs : 256 out Offset : " + endOffsets.get(topicPartitionArr[i2]) + " index : " + i2);
            Assert.assertTrue(((Long) endOffsets.get(topicPartitionArr[i2])).longValue() == ((long) 256));
        }
        madmin.createTopic(str, "t2", numParts);
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < numParts; i3++) {
            arrayList2.add(new TopicPartition(str + ":t2", i3));
        }
        Map endOffsets2 = consumer.endOffsets(arrayList2);
        arrayList2.toArray(topicPartitionArr);
        for (int i4 = 0; i4 < numParts; i4++) {
            System.err.println("out Offset : " + endOffsets2.get(topicPartitionArr[i4]) + " index : " + i4);
            Assert.assertTrue(((Long) endOffsets2.get(topicPartitionArr[i4])).longValue() == 0);
        }
        madmin.deleteStream(str);
    }

    @Test
    public void testHeaders() throws IOException {
        String str = PREFIX + "headers";
        String str2 = str + 0;
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        _logger.info("Populate stream and check listener");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false, 10, true));
        Assert.assertTrue(ListenerV10.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 65536, 131072, 1, true, true));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false, 10, false));
        Assert.assertTrue(ListenerV10.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 65536, 131072, 1, true, false));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
    }

    @Test
    public void testPollWithVaryingMaxFetchSize() throws IOException {
        String str = PREFIX + "pollmaxfetchsize";
        String str2 = str + 0;
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        _logger.info("Populate stream and check listener");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 131072, 1536, Integer.MAX_VALUE));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 131072, 10240, Integer.MAX_VALUE));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 131072, 1536, Integer.MAX_VALUE));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e3) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 131072, 10240, Integer.MAX_VALUE));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e4) {
        }
        _logger.info("test with max fetch size lesser than msg size");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 10000, false));
        Assert.assertTrue(Listener.runTestWithPollOptions(str, 1, 2, 2, numParts, 10000, 131072, 108, Integer.MAX_VALUE));
        madmin.deleteStream(str2);
    }

    @Test
    public void testPollWithVaryingMaxRecords() throws IOException {
        String str = PREFIX + "pollMaxRecords";
        String str2 = str + 0;
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        _logger.info("Populate stream and check listener");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false, 10, true));
        Assert.assertTrue(ListenerV10.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 65536, 131072, 1, true, true));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 0, numParts, 10000, false, 10, true));
        Assert.assertTrue(ListenerV10.runTestWithPollOptions(str, 1, 2, 0, numParts, 10000, 65536, 131072, 10, true, true));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e2) {
        }
        _logger.info("test with topics where message are generated slowly");
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false, 10, true));
        Assert.assertTrue(ListenerV10.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 65536, 131072, 1, true, true));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e3) {
        }
        madmin.createStream(str2, newStreamDescriptor);
        Assert.assertTrue(Producer.runTest(str, 1, 2, 2, numParts, 100000, false, 10, true));
        Assert.assertTrue(ListenerV10.runTestWithPollOptions(str, 1, 2, 2, numParts, 100000, 65536, 131072, 10, true, true));
        madmin.deleteStream(str2);
        try {
            Thread.sleep(10000L);
        } catch (Exception e4) {
        }
    }

    @Test
    public void testListenerPaused() throws Exception {
        String str = PREFIX + "listenerpaused";
        madmin.createStream(str, Streams.newStreamDescriptor());
        madmin.createTopic(str, "t1", numParts);
        madmin.createTopic(str, "t2", numParts);
        madmin.createTopic(str, "t3", numParts);
        TopicPartition topicPartition = new TopicPartition(str + ":t1", 0);
        TopicPartition topicPartition2 = new TopicPartition(str + ":t2", 0);
        TopicPartition topicPartition3 = new TopicPartition(str + ":t3", 0);
        List asList = Arrays.asList(topicPartition, topicPartition2, topicPartition3);
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        consumer.assign(asList);
        consumer.pause(Arrays.asList(topicPartition2, topicPartition3));
        Set paused = consumer.paused();
        Assert.assertTrue(paused.contains(topicPartition2) && paused.contains(topicPartition3) && !paused.contains(topicPartition));
        consumer.resume(Arrays.asList(topicPartition2, topicPartition3));
        Assert.assertTrue(consumer.paused().isEmpty());
    }

    @Test
    public void testListenerTimeIndexDescending() throws Exception {
        String str = PREFIX + "listenertimeindexdescending";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        new SendTimedMessagesToProducer((KafkaProducer) producer, new ProducerMultiTest.CountCallback(100 * 1), str + ":t", 1, 100, -1, 1000L).run();
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 910L);
        Map offsetsForTimes = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp() == 1000);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset() == 0);
        consumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerTimeIndex() throws Exception {
        String str = PREFIX + "listenertimeindex";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        new SendTimedMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 1000L).run();
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 1009L);
        Map offsetsForTimes = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp() == 1009);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset() == 9);
        new SendTimedMessagesToProducer(getProducer(), new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 15000L).run();
        hashMap.put(topicPartition, 15001L);
        Map offsetsForTimes2 = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes2.get(topicPartition)).timestamp() == 15001);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes2.get(topicPartition)).offset() == 257);
        new SendTimedMessagesToProducer(getProducer(), new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 5000L).run();
        Map offsetsForTimes3 = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes3.get(topicPartition)).timestamp() == 15001);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes3.get(topicPartition)).offset() == 257);
        hashMap.put(topicPartition, 100L);
        Map offsetsForTimes4 = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes4.get(topicPartition)).timestamp() == 1000);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes4.get(topicPartition)).offset() == 0);
        consumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerTimeIndexSplit() throws Exception {
        String str = PREFIX + "listenertimeindexsplit";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", numParts);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        com.mapr.db.Admin newAdmin = MapRDBImpl.newAdmin();
        TableDescriptor tableDescriptor = newAdmin.getTableDescriptor(str);
        tableDescriptor.setSplitSize(256L);
        newAdmin.alterTable(tableDescriptor);
        new SendTimedMessagesToProducer((KafkaProducer) producer, new ProducerMultiTest.CountCallback(1000000 * 1), str + ":t", 1, 1000000, 1000L, 20000).run();
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 1000000L);
        Map offsetsForTimes = consumer.offsetsForTimes(hashMap);
        _logger.info("timestamp is " + ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp() + " offset is " + ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset());
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp() >= 1000000);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset() == 999000);
        consumer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testListenerTimeIndexMultipleFeeds() throws Exception {
        String str = PREFIX + "listenertimeindexmultiplefeeds";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        String str2 = PREFIX + "listenertimeindexmultiplefeeds2";
        madmin.createStream(str2, newStreamDescriptor);
        madmin.createTopic(str, "t", numParts);
        madmin.createTopic(str2, "t2", numParts);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        new SendTimedMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 1000L).run();
        new SendTimedMessagesToProducer(getProducer(), new ProducerMultiTest.CountCallback(256 * 1), str2 + ":t2", 1, 256, 15000L).run();
        HashMap hashMap = new HashMap();
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        hashMap.put(topicPartition, 1009L);
        TopicPartition topicPartition2 = new TopicPartition(str2 + ":t2", 0);
        hashMap.put(topicPartition2, 15001L);
        Map offsetsForTimes = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp() == 1009);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset() == 9);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition2)).timestamp() == 15001);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition2)).offset() == 1);
        consumer.close();
        madmin.deleteStream(str);
        madmin.deleteStream(str2);
    }

    @Test
    public void testListenerTimeIndexTimeout() throws Exception {
        final String str = PREFIX + "listenertimeindextimeout";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        madmin.createTopic(str, "t", numParts);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("auto.offset.reset", "earliest");
        properties.put("streams.rpc.timeout.ms", "30000");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        new SendTimedMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 1000L).run();
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 1009L);
        hashMap.put(new TopicPartition(str + ":t2", 0), 15001L);
        Exception exc = null;
        try {
            kafkaConsumer.offsetsForTimes(hashMap);
        } catch (Exception e) {
            exc = e;
        }
        Assert.assertTrue(exc instanceof TimeoutException);
        Thread thread = new Thread(new Runnable() { // from class: com.mapr.streams.tests.listener.ListenerV10Test.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(5000L);
                    ListenerV10Test.madmin.createTopic(str, "t2", ListenerV10Test.numParts);
                } catch (IOException | InterruptedException e2) {
                    ListenerV10Test._logger.info("ignoring exception " + e2);
                }
            }
        });
        thread.start();
        Exception exc2 = null;
        try {
            kafkaConsumer.offsetsForTimes(hashMap);
        } catch (Exception e2) {
            exc2 = e2;
        }
        Assert.assertTrue(exc2 == null);
        thread.join();
        kafkaConsumer.close();
        madmin.deleteStream(str);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v3, types: [org.apache.kafka.common.TopicPartition, long] */
    @Test
    public void testListenerTimedMessages() throws Exception {
        String str = PREFIX + "listenertimedmessages";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        long j = 1000;
        new SendTimedMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 1000L).run();
        ?? topicPartition = new TopicPartition(str + ":t", 0);
        consumer.assign(Arrays.asList(topicPartition));
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 == 256) {
                consumer.close();
                madmin.deleteStream(str);
                return;
            }
            ConsumerRecords poll = consumer.poll(100L);
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ((ConsumerRecord) it.next()).timestamp();
                long j2 = j;
                j = j2 + 1;
                Assert.assertTrue(topicPartition == j2);
            }
            i = i2 + poll.count();
        }
    }

    @Test
    public void testListenerZeroEventTime() throws Exception {
        String str = PREFIX + "listenertimedmessages";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        long j = 0L;
        while (true) {
            Long l = j;
            if (l.longValue() > 3) {
                break;
            }
            producer.send(new ProducerRecord(str + ":t", 0, l, (Object) null, (Object) null));
            producer.flush();
            j = Long.valueOf(l.longValue() + 1);
        }
        HashMap hashMap = new HashMap();
        TopicPartition topicPartition = new TopicPartition(str + ":t", 0);
        hashMap.put(topicPartition, 0L);
        Map offsetsForTimes = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp() == 0);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset() == 0);
        hashMap.put(topicPartition, 1L);
        Map offsetsForTimes2 = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes2.get(topicPartition)).timestamp() == 1);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes2.get(topicPartition)).offset() == 1);
        hashMap.put(topicPartition, 2L);
        Map offsetsForTimes3 = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes3.get(topicPartition)).timestamp() == 2);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes3.get(topicPartition)).offset() == 2);
        hashMap.put(topicPartition, 3L);
        Map offsetsForTimes4 = consumer.offsetsForTimes(hashMap);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes4.get(topicPartition)).timestamp() == 3);
        Assert.assertTrue(((OffsetAndTimestamp) offsetsForTimes4.get(topicPartition)).offset() == 3);
        producer.close();
        consumer.close();
        madmin.deleteStream(str);
    }

    private KafkaProducer<byte[], byte[]> getProducer() {
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("streams.parallel.flushers.per.partition", "false");
        return new KafkaProducer<>(properties);
    }

    private KafkaConsumer<byte[], byte[]> getConsumer() {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("fetch.min.bytes", "1");
        properties.put("auto.offset.reset", "earliest");
        return new KafkaConsumer<>(properties);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r3v3, types: [org.apache.kafka.common.TopicPartition, long] */
    @Test
    public void testListenerLogTimedMessages() throws Exception {
        String str = PREFIX + "listenerlogtimedmessages";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultTimestampType(TimestampType.LOG_APPEND_TIME);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        long j = 1000;
        new SendTimedMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 1000L).run();
        ?? topicPartition = new TopicPartition(str + ":t", 0);
        consumer.assign(Arrays.asList(topicPartition));
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 == 256) {
                consumer.close();
                madmin.deleteStream(str);
                return;
            }
            ConsumerRecords poll = consumer.poll(100L);
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ((ConsumerRecord) it.next()).timestamp();
                long j2 = j;
                j = j2 + 1;
                Assert.assertTrue(topicPartition != j2);
            }
            i = i2 + poll.count();
        }
    }

    @Test
    public void testListenerEditTimedMessages() throws Exception {
        String str = PREFIX + "listeneredittimedmessages";
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultTimestampType(TimestampType.LOG_APPEND_TIME);
        newStreamDescriptor.setTimeToLiveSec(0L);
        madmin.createStream(str, newStreamDescriptor);
        KafkaProducer<byte[], byte[]> producer = getProducer();
        KafkaConsumer<byte[], byte[]> consumer = getConsumer();
        madmin.createTopic(str, "t", numParts);
        new SendTimedMessagesToProducer(producer, new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 1000L).run();
        consumer.assign(Arrays.asList(new TopicPartition(str + ":t", 0)));
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 == 256) {
                break;
            }
            ConsumerRecords poll = consumer.poll(100L);
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                Assert.assertTrue(((ConsumerRecord) it.next()).timestampType().equals(org.apache.kafka.common.record.TimestampType.LOG_APPEND_TIME));
            }
            i = i2 + poll.count();
        }
        TopicDescriptor newTopicDescriptor = Streams.newTopicDescriptor();
        newTopicDescriptor.setTimestampType(TimestampType.CREATE_TIME);
        madmin.editTopic(str, "t", newTopicDescriptor);
        new SendTimedMessagesToProducer(getProducer(), new ProducerMultiTest.CountCallback(256 * 1), str + ":t", 1, 256, 1000L).run();
        int i3 = 0;
        while (true) {
            int i4 = i3;
            if (i4 == 256) {
                consumer.close();
                madmin.deleteStream(str);
                return;
            } else {
                ConsumerRecords poll2 = consumer.poll(100L);
                Iterator it2 = poll2.iterator();
                while (it2.hasNext()) {
                    Assert.assertTrue(((ConsumerRecord) it2.next()).timestampType().equals(org.apache.kafka.common.record.TimestampType.CREATE_TIME));
                }
                i3 = i4 + poll2.count();
            }
        }
    }

    private void interceptorProduceAndConsumeMsgs(String str, KafkaProducer<byte[], byte[]> kafkaProducer, KafkaConsumer<byte[], byte[]> kafkaConsumer, int i, int i2) {
        new SendMessagesToProducer(kafkaProducer, new ProducerMultiTest.CountCallback(i * 1), str, i2, i).run();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kafkaConsumer.subscribe(arrayList);
        try {
            Thread.sleep(2000L);
        } catch (Exception e) {
            System.out.println("Sleep interrupted " + e);
        }
        Assert.assertEquals(0L, kafkaConsumer.poll(0L).count());
        Assert.assertEquals(i * i2, kafkaConsumer.poll(0L).count());
    }

    @Test
    public void testInterceptorOnConsume() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "listenerinterceptoronconsume";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        Assert.assertEquals(2, testConsumerInterceptor.getOnConsumeCount());
        Assert.assertEquals(10 * numParts, testConsumerInterceptor.getNumRecordsOnConsume());
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testInterceptorOnCommitAutoCommit() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "listenerinterceptorautocommit";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", true);
        properties2.put("auto.commit.interval.ms", 1000);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        try {
            Thread.sleep(5000L);
        } catch (Exception e) {
            System.out.println(e);
        }
        System.err.println(" On commit count " + testConsumerInterceptor.getOnCommitCount());
        if (!$assertionsDisabled && testConsumerInterceptor.getOnCommitCount() <= 0) {
            throw new AssertionError();
        }
        testConsumerInterceptor.resetCounters();
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testInterceptorOnCommitAutoCommitOnClose() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "ListenerInterceptorAutoCommitOnClose";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("group.id", "autoCommitCloseTest");
        properties2.put("enable.auto.commit", true);
        properties2.put("auto.commit.interval.ms", 100000);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        System.err.println(" On commit count " + testConsumerInterceptor.getOnCommitCount());
        if (!$assertionsDisabled && testConsumerInterceptor.getOnCommitCount() <= 0) {
            throw new AssertionError();
        }
        madmin.deleteStream(str);
    }

    @Test
    public void testInterceptorOnCommitASync() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "listenerinterceptorasync";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        kafkaConsumer.commitAsync();
        try {
            Thread.sleep(300L);
        } catch (Exception e) {
            System.out.println(e);
        }
        Assert.assertEquals(2, testConsumerInterceptor.getOnConsumeCount());
        Assert.assertEquals(10 * numParts, testConsumerInterceptor.getNumRecordsOnConsume());
        Assert.assertEquals(1L, testConsumerInterceptor.getOnCommitCount());
        Assert.assertEquals(4L, testConsumerInterceptor.getNumParttionsOnCommit());
        testConsumerInterceptor.resetCounters();
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testInterceptorOnCommitASyncCb() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "listenerinterceptorasynccb";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        CommitCb commitCb = new CommitCb();
        kafkaConsumer.commitAsync(commitCb);
        commitCb.commitDone();
        Assert.assertEquals(2, testConsumerInterceptor.getOnConsumeCount());
        Assert.assertEquals(10 * numParts, testConsumerInterceptor.getNumRecordsOnConsume());
        Assert.assertEquals(1L, testConsumerInterceptor.getOnCommitCount());
        Assert.assertEquals(4L, testConsumerInterceptor.getNumParttionsOnCommit());
        testConsumerInterceptor.resetCounters();
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testInterceptorOnCommitASyncCbPart() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "listenerinterceptorasynccbpart";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        for (int i = 0; i < numParts; i++) {
            topicPartitionArr[i] = new TopicPartition(str + ":t", i);
        }
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < numParts; i2 += 2) {
            hashMap.put(topicPartitionArr[i2], new OffsetAndMetadata(2 ^ i2));
        }
        CommitCb commitCb = new CommitCb();
        kafkaConsumer.commitAsync(hashMap, commitCb);
        commitCb.commitDone();
        Assert.assertEquals(2, testConsumerInterceptor.getOnConsumeCount());
        Assert.assertEquals(10 * numParts, testConsumerInterceptor.getNumRecordsOnConsume());
        Assert.assertEquals(1L, testConsumerInterceptor.getOnCommitCount());
        Assert.assertEquals(2L, testConsumerInterceptor.getNumParttionsOnCommit());
        testConsumerInterceptor.resetCounters();
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testInterceptorOnCommitSync() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "listenerinterceptorsync";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        kafkaConsumer.commitSync();
        Assert.assertEquals(2, testConsumerInterceptor.getOnConsumeCount());
        Assert.assertEquals(10 * numParts, testConsumerInterceptor.getNumRecordsOnConsume());
        Assert.assertEquals(1L, testConsumerInterceptor.getOnCommitCount());
        Assert.assertEquals(4L, testConsumerInterceptor.getNumParttionsOnCommit());
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    @Test
    public void testInterceptorOnCommitSyncPart() throws Exception {
        TestConsumerInterceptor testConsumerInterceptor = new TestConsumerInterceptor();
        String str = PREFIX + "listenerinterceptorsyncpart";
        madmin.createStream(str, Streams.newStreamDescriptor());
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("fetch.min.bytes", "1");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        properties2.put("interceptor.classes", TestConsumerInterceptor.class.getName());
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties2);
        madmin.createTopic(str, "t", numParts);
        testConsumerInterceptor.resetCounters();
        interceptorProduceAndConsumeMsgs(str + ":t", kafkaProducer, kafkaConsumer, 10, numParts);
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        for (int i = 0; i < numParts; i++) {
            topicPartitionArr[i] = new TopicPartition(str + ":t", i);
        }
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < numParts; i2 += 2) {
            hashMap.put(topicPartitionArr[i2], new OffsetAndMetadata(2 ^ i2));
        }
        kafkaConsumer.commitSync(hashMap);
        Assert.assertEquals(2, testConsumerInterceptor.getOnConsumeCount());
        Assert.assertEquals(10 * numParts, testConsumerInterceptor.getNumRecordsOnConsume());
        Assert.assertEquals(1L, testConsumerInterceptor.getOnCommitCount());
        Assert.assertEquals(2L, testConsumerInterceptor.getNumParttionsOnCommit());
        kafkaConsumer.unsubscribe();
        kafkaConsumer.close();
        kafkaProducer.close();
        madmin.deleteStream(str);
    }

    static {
        $assertionsDisabled = !ListenerV10Test.class.desiredAssertionStatus();
        _logger = LoggerFactory.getLogger(ListenerV10Test.class);
        PREFIX = "/jtest-" + ListenerV10Test.class.getSimpleName() + "-";
    }
}
