package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({ClusterTest.class})
/* loaded from: input_file:com/mapr/streams/tests/listener/ListenerCommitTest.class */
public class ListenerCommitTest extends BaseTest {
    private static Admin madmin;
    private static KafkaProducer producer;
    private static KafkaConsumer kc;
    private static KafkaConsumer kclg;
    private static final int numParts = 10;
    private static final Logger _logger = LoggerFactory.getLogger(ListenerCommitTest.class);
    private static final String STREAM = "/jtest-" + ListenerCommitTest.class.getSimpleName();
    public static final byte[] value = new byte[200];
    public static final byte[] key = "abc".getBytes();

    /* loaded from: input_file:com/mapr/streams/tests/listener/ListenerCommitTest$CommitCb.class */
    public final class CommitCb implements OffsetCommitCallback {
        private boolean committed = false;
        private Map<TopicPartition, OffsetAndMetadata> committedOffsets = null;
        private Exception committedException = null;

        public CommitCb() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            synchronized (this) {
                this.committedOffsets = map;
                this.committedException = exc;
                this.committed = true;
                notifyAll();
            }
        }

        public synchronized void commitDone() {
            while (!this.committed) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
        }

        public Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets() {
            return this.committedOffsets;
        }

        public Exception getException() {
            return this.committedException;
        }
    }

    /* loaded from: input_file:com/mapr/streams/tests/listener/ListenerCommitTest$RebalanceCb.class */
    public final class RebalanceCb implements ConsumerRebalanceListener {
        private boolean revoked = false;
        private boolean assigned = false;

        public RebalanceCb() {
        }

        public synchronized void clear() {
            this.revoked = false;
            this.assigned = false;
        }

        public synchronized void revokeDone() {
            while (!this.revoked) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
            this.revoked = false;
        }

        public synchronized void assignDone() {
            while (!this.assigned) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
            this.assigned = false;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            synchronized (this) {
                this.assigned = true;
                notifyAll();
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            synchronized (this) {
                this.revoked = true;
                notifyAll();
            }
        }
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration configuration = new Configuration();
        Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        madmin = Streams.newAdmin(configuration);
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        properties.put(defaultInstance.getParallelFlushersPerPartition(), false);
        properties.put(defaultInstance.getMetadataMaxAge(), 100);
        properties.put(defaultInstance.getBufferTime(), 5000);
        producer = new KafkaProducer(properties);
        Properties properties2 = new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("enable.auto.commit", false);
        properties2.put(defaultInstance.getMetadataMaxAge(), 5000);
        kc = new KafkaConsumer(properties2);
        properties2.put("group.id", "committest");
        kclg = new KafkaConsumer(properties2);
        try {
            madmin.deleteStream(STREAM);
        } catch (Exception e) {
        }
        StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
        newStreamDescriptor.setDefaultPartitions(numParts);
        madmin.createStream(STREAM, newStreamDescriptor);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        producer.close();
        kc.close();
        kclg.close();
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testCommitSyncNoParam() throws Exception {
        commitSyncNoParam(kc, STREAM + ":CommitSyncNoParam");
    }

    @Test
    public void testCommitSyncNoParamLG() throws Exception {
        commitSyncNoParam(kclg, STREAM + ":CommitSyncNoParamLG");
    }

    @Test
    public void testCommitSyncWithOffsets() throws Exception {
        commitSyncNoParam(kc, STREAM + ":CommitSyncWithOffsets");
    }

    @Test
    public void testCommitSyncWithOffsetsLG() throws Exception {
        commitSyncNoParam(kclg, STREAM + ":CommitSyncWithOffsetsLG");
    }

    @Test
    public void testCommitAsyncNoParam() throws Exception {
        commitAsyncNoParam(kc, STREAM + ":CommitAsyncNoParam");
    }

    @Test
    public void testCommitAsyncNoParamLG() throws Exception {
        commitAsyncNoParam(kclg, STREAM + ":CommitAsyncNoParamLG");
    }

    @Test
    public void testCommitAsyncCallback() throws Exception {
        commitAsyncCallback(kc, STREAM + ":CommitAsyncCallback");
    }

    @Test
    public void testCommitAsyncCallbackLG() throws Exception {
        commitAsyncCallback(kclg, STREAM + ":CommitAsyncCallbackLG");
    }

    @Test
    public void testCommitAsyncCallbackOffsets() throws Exception {
        commitAsyncCallbackOffsets(kc, STREAM + ":CommitAsyncCallbackOffsets");
    }

    @Test
    public void testCommitAsyncCallbackOffsetsLG() throws Exception {
        commitAsyncCallbackOffsets(kclg, STREAM + ":CommitAsyncCallbackOffsetsLG");
    }

    public void testPhase1(KafkaConsumer kafkaConsumer, String str, TopicPartition[] topicPartitionArr, Future[] futureArr) throws Exception {
        for (int i = 0; i < numParts; i++) {
            topicPartitionArr[i] = new TopicPartition(str, i);
        }
        for (int i2 = 0; i2 < numParts; i2++) {
            futureArr[i2] = producer.send(new ProducerRecord(str, Integer.valueOf(i2), key, value));
        }
        producer.flush();
        for (Future future : futureArr) {
            future.get();
        }
        RebalanceCb rebalanceCb = new RebalanceCb();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kafkaConsumer.subscribe(arrayList, rebalanceCb);
        rebalanceCb.assignDone();
        try {
            Thread.sleep(200L);
        } catch (InterruptedException e) {
        }
        kafkaConsumer.poll(100L);
        kafkaConsumer.poll(100L);
        for (int i3 = 0; i3 < numParts; i3++) {
            boolean z = false;
            OffsetAndMetadata offsetAndMetadata = null;
            try {
                offsetAndMetadata = kafkaConsumer.committed(topicPartitionArr[i3]);
            } catch (Exception e2) {
                z = true;
                System.out.println("committed exception: " + e2);
            }
            Assert.assertTrue(z || offsetAndMetadata == null || offsetAndMetadata.offset() == 0);
        }
    }

    public void testPhase2(KafkaConsumer kafkaConsumer, String str, TopicPartition[] topicPartitionArr, Future[] futureArr, Future[] futureArr2) throws Exception {
        for (int i = 0; i < numParts; i++) {
            Assert.assertTrue(kafkaConsumer.committed(topicPartitionArr[i]).offset() == ((RecordMetadata) futureArr[i].get()).offset() + 1);
        }
        for (int i2 = 0; i2 < numParts; i2++) {
            futureArr2[i2] = producer.send(new ProducerRecord(str, Integer.valueOf(i2), key, value));
        }
        producer.flush();
        for (Future future : futureArr2) {
            future.get();
        }
        for (int i3 = 0; i3 < numParts; i3++) {
            Assert.assertTrue(kafkaConsumer.committed(topicPartitionArr[i3]).offset() == ((RecordMetadata) futureArr[i3].get()).offset() + 1);
        }
        try {
            Thread.sleep(200L);
        } catch (InterruptedException e) {
        }
        kafkaConsumer.poll(100L);
        kafkaConsumer.poll(100L);
        for (int i4 = 0; i4 < numParts; i4++) {
            Assert.assertTrue(kafkaConsumer.committed(topicPartitionArr[i4]).offset() == ((RecordMetadata) futureArr[i4].get()).offset() + 1);
        }
    }

    public void testPhase3(KafkaConsumer kafkaConsumer, String str, TopicPartition[] topicPartitionArr, Future[] futureArr) throws Exception {
        for (int i = 0; i < numParts; i++) {
            Assert.assertTrue(kafkaConsumer.committed(topicPartitionArr[i]).offset() == ((RecordMetadata) futureArr[i].get()).offset() + 1);
        }
        kafkaConsumer.unsubscribe();
    }

    public void commitSyncNoParam(KafkaConsumer kafkaConsumer, String str) throws Exception {
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        Future[] futureArr = new Future[numParts];
        Future[] futureArr2 = new Future[numParts];
        testPhase1(kafkaConsumer, str, topicPartitionArr, futureArr);
        kafkaConsumer.commitSync();
        testPhase2(kafkaConsumer, str, topicPartitionArr, futureArr, futureArr2);
        kafkaConsumer.commitSync();
        testPhase3(kafkaConsumer, str, topicPartitionArr, futureArr2);
    }

    public void commitSyncWithOffsets(KafkaConsumer kafkaConsumer, String str) throws Exception {
        String[] split = str.split(":");
        madmin.createTopic(split[0], split[1], numParts);
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        for (int i = 0; i < numParts; i++) {
            topicPartitionArr[i] = new TopicPartition(str, i);
        }
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < numParts; i2 += 2) {
            hashMap.put(topicPartitionArr[i2], new OffsetAndMetadata(2 ^ i2));
        }
        RebalanceCb rebalanceCb = new RebalanceCb();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kafkaConsumer.subscribe(arrayList, rebalanceCb);
        rebalanceCb.assignDone();
        kafkaConsumer.commitSync(hashMap);
        for (int i3 = 0; i3 < numParts; i3++) {
            OffsetAndMetadata committed = kafkaConsumer.committed(topicPartitionArr[i3]);
            if (i3 % 2 == 0) {
                Assert.assertTrue(committed.offset() == ((long) (2 ^ i3)));
            } else {
                Assert.assertTrue(committed.offset() == 0);
            }
        }
        HashMap hashMap2 = new HashMap();
        for (int i4 = 0; i4 < numParts; i4++) {
            hashMap2.put(topicPartitionArr[i4], new OffsetAndMetadata(i4 + 1024));
        }
        kafkaConsumer.commitSync(hashMap2);
        for (int i5 = 0; i5 < numParts; i5++) {
            Assert.assertTrue(kafkaConsumer.committed(topicPartitionArr[i5]).offset() == ((long) (i5 + 1024)));
        }
        kafkaConsumer.unsubscribe();
    }

    public void commitAsyncNoParam(KafkaConsumer kafkaConsumer, String str) throws Exception {
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        Future[] futureArr = new Future[numParts];
        Future[] futureArr2 = new Future[numParts];
        testPhase1(kafkaConsumer, str, topicPartitionArr, futureArr);
        kafkaConsumer.commitAsync();
        try {
            Thread.sleep(300L);
        } catch (Exception e) {
            System.out.println(e);
        }
        testPhase2(kafkaConsumer, str, topicPartitionArr, futureArr, futureArr2);
        kafkaConsumer.commitAsync();
        try {
            Thread.sleep(300L);
        } catch (Exception e2) {
            System.out.println(e2);
        }
        testPhase3(kafkaConsumer, str, topicPartitionArr, futureArr2);
    }

    public void commitAsyncCallback(KafkaConsumer kafkaConsumer, String str) throws Exception {
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        Future[] futureArr = new Future[numParts];
        Future[] futureArr2 = new Future[numParts];
        testPhase1(kafkaConsumer, str, topicPartitionArr, futureArr);
        CommitCb commitCb = new CommitCb();
        kafkaConsumer.commitAsync(commitCb);
        commitCb.commitDone();
        Assert.assertTrue(commitCb.getException() == null);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : commitCb.getCommittedOffsets().entrySet()) {
            Assert.assertTrue(entry.getValue().equals(kafkaConsumer.committed(entry.getKey())));
        }
        testPhase2(kafkaConsumer, str, topicPartitionArr, futureArr, futureArr2);
        CommitCb commitCb2 = new CommitCb();
        kafkaConsumer.commitAsync(commitCb2);
        commitCb2.commitDone();
        Assert.assertTrue(commitCb2.getException() == null);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : commitCb2.getCommittedOffsets().entrySet()) {
            Assert.assertTrue(entry2.getValue().equals(kafkaConsumer.committed(entry2.getKey())));
        }
        testPhase3(kafkaConsumer, str, topicPartitionArr, futureArr2);
    }

    public void commitAsyncCallbackOffsets(KafkaConsumer kafkaConsumer, String str) throws Exception {
        String[] split = str.split(":");
        madmin.createTopic(split[0], split[1], numParts);
        TopicPartition[] topicPartitionArr = new TopicPartition[numParts];
        for (int i = 0; i < numParts; i++) {
            topicPartitionArr[i] = new TopicPartition(str, i);
        }
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < numParts; i2 += 2) {
            hashMap.put(topicPartitionArr[i2], new OffsetAndMetadata(2 ^ i2));
        }
        RebalanceCb rebalanceCb = new RebalanceCb();
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kafkaConsumer.subscribe(arrayList, rebalanceCb);
        rebalanceCb.assignDone();
        CommitCb commitCb = new CommitCb();
        kafkaConsumer.commitAsync(hashMap, commitCb);
        commitCb.commitDone();
        Assert.assertTrue(commitCb.getException() == null);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : commitCb.getCommittedOffsets().entrySet()) {
            Assert.assertTrue(entry.getValue().equals(kafkaConsumer.committed(entry.getKey())));
        }
        for (int i3 = 0; i3 < numParts; i3++) {
            OffsetAndMetadata offsetAndMetadata = null;
            boolean z = false;
            try {
                offsetAndMetadata = kafkaConsumer.committed(topicPartitionArr[i3]);
            } catch (Exception e) {
                z = true;
                System.out.println("commited exception: " + e);
            }
            if (i3 % 2 == 0) {
                Assert.assertTrue(offsetAndMetadata.offset() == ((long) (2 ^ i3)));
            } else {
                Assert.assertTrue(z || offsetAndMetadata == null || offsetAndMetadata.offset() == 0);
            }
        }
        HashMap hashMap2 = new HashMap();
        for (int i4 = 0; i4 < numParts; i4++) {
            hashMap2.put(topicPartitionArr[i4], new OffsetAndMetadata(i4 + 1024));
        }
        CommitCb commitCb2 = new CommitCb();
        kafkaConsumer.commitAsync(hashMap2, commitCb2);
        commitCb2.commitDone();
        Assert.assertTrue(commitCb2.getException() == null);
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : commitCb2.getCommittedOffsets().entrySet()) {
            Assert.assertTrue(entry2.getValue().equals(kafkaConsumer.committed(entry2.getKey())));
        }
        for (int i5 = 0; i5 < numParts; i5++) {
            Assert.assertTrue(kafkaConsumer.committed(topicPartitionArr[i5]).offset() == ((long) (i5 + 1024)));
        }
        kafkaConsumer.unsubscribe();
    }
}
