/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tests.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.tests.BaseTest;
import com.mapr.tests.annotations.ClusterTest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ClusterTest.class})
public class ListenerCommitTest
extends BaseTest {
    private static final Logger _logger = LoggerFactory.getLogger(ListenerCommitTest.class);
    private static final String INTERNAL_STREAM = "/var/mapr/kafka-internal-stream";
    private static final String STREAM = "/jtest-" + ListenerCommitTest.class.getSimpleName();
    private static Admin madmin;
    private static KafkaProducer producer;
    private static KafkaConsumer kc;
    private static KafkaConsumer kclg;
    private static final int numParts = 10;
    public static final byte[] value;
    public static final byte[] key;

    private static Properties getConsumerProperties() {
        Properties props = new Properties();
        String clientPartitionerEnabled = System.getProperty("TEST_CLIENT_PARTITIONER");
        if (clientPartitionerEnabled != null && clientPartitionerEnabled.equalsIgnoreCase("true")) {
            props.put("streams.clientside.partition.assignment", "true");
            props.put("streams.default.internal.stream", INTERNAL_STREAM);
            props.put("partition.assignment.strategy", RoundRobinAssignor.class.getName());
        }
        return props;
    }

    @BeforeClass
    public static void setupTestClass() throws Exception {
        Configuration conf = new Configuration();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        madmin = Streams.newAdmin((Configuration)conf);
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)false);
        props.put(cdef.getMetadataMaxAge(), (Object)100);
        props.put(cdef.getBufferTime(), (Object)5000);
        producer = new KafkaProducer(props);
        props = ListenerCommitTest.getConsumerProperties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", (Object)false);
        props.put(cdef.getMetadataMaxAge(), (Object)5000);
        kc = new KafkaConsumer(props);
        props.put("group.id", "committest");
        kclg = new KafkaConsumer(props);
        try {
            madmin.deleteStream(STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            madmin.deleteStream(INTERNAL_STREAM);
        }
        catch (Exception exception) {
            // empty catch block
        }
        StreamDescriptor sdesc = Streams.newStreamDescriptor();
        sdesc.setDefaultPartitions(10);
        madmin.createStream(INTERNAL_STREAM, sdesc);
        madmin.createStream(STREAM, sdesc);
    }

    @AfterClass
    public static void cleanupTestClass() throws Exception {
        producer.close();
        kc.close();
        kclg.close();
        madmin.deleteStream(INTERNAL_STREAM);
        madmin.deleteStream(STREAM);
    }

    @Test
    public void testCommitSyncNoParam() throws Exception {
        String topicname = STREAM + ":CommitSyncNoParam";
        this.commitSyncNoParam(kc, topicname);
    }

    @Test
    public void testCommitSyncNoParamLG() throws Exception {
        String topicname = STREAM + ":CommitSyncNoParamLG";
        this.commitSyncNoParam(kclg, topicname);
    }

    @Test
    public void testCommitSyncWithOffsets() throws Exception {
        String topicname = STREAM + ":CommitSyncWithOffsets";
        this.commitSyncWithOffsets(kc, topicname);
    }

    @Test
    public void testCommitSyncWithOffsetsLG() throws Exception {
        String topicname = STREAM + ":CommitSyncWithOffsetsLG";
        this.commitSyncWithOffsets(kclg, topicname);
    }

    @Test
    public void testCommitSyncWithOffsetsAndMetadata() throws Exception {
        String topicname = STREAM + ":CommitSyncWithOffsetsAndMetadata";
        this.commitSyncWithOffsetsAndMetadata(kc, topicname);
    }

    @Test
    public void testCommitSyncWithOffsetsAndMetadataLG() throws Exception {
        String topicname = STREAM + ":CommitSyncWithOffsetsAndMetadataLG";
        this.commitSyncWithOffsetsAndMetadata(kclg, topicname);
    }

    @Test
    public void testCommitAsyncNoParam() throws Exception {
        String topicname = STREAM + ":CommitAsyncNoParam";
        this.commitAsyncNoParam(kc, topicname);
    }

    @Test
    public void testCommitAsyncNoParamLG() throws Exception {
        String topicname = STREAM + ":CommitAsyncNoParamLG";
        this.commitAsyncNoParam(kclg, topicname);
    }

    @Test
    public void testCommitAsyncCallback() throws Exception {
        String topicname = STREAM + ":CommitAsyncCallback";
        this.commitAsyncCallback(kc, topicname);
    }

    @Test
    public void testCommitAsyncCallbackLG() throws Exception {
        String topicname = STREAM + ":CommitAsyncCallbackLG";
        this.commitAsyncCallback(kclg, topicname);
    }

    @Test
    public void testCommitAsyncCallbackOffsets() throws Exception {
        String topicname = STREAM + ":CommitAsyncCallbackOffsets";
        this.commitAsyncCallbackOffsets(kc, topicname);
    }

    @Test
    public void testCommitAsyncCallbackOffsetsLG() throws Exception {
        String topicname = STREAM + ":CommitAsyncCallbackOffsetsLG";
        this.commitAsyncCallbackOffsets(kclg, topicname);
    }

    @Test
    public void testCommitAsyncCallbackOffsetsAndMetadata() throws Exception {
        String topicname = STREAM + ":CommitAsyncCallbackOffsetsAndMetadata";
        this.commitAsyncCallbackOffsetsAndMetadata(kc, topicname);
    }

    @Test
    public void testCommitAsyncCallbackOffsetsAndMetadataLG() throws Exception {
        String topicname = STREAM + ":CommitAsyncCallbackOffsetsAndMetadataLG";
        this.commitAsyncCallbackOffsetsAndMetadata(kclg, topicname);
    }

    public void testPhase1(KafkaConsumer consumer, String topicname, TopicPartition[] topicpartitions, Future[] futures) throws Exception {
        int i;
        for (i = 0; i < 10; ++i) {
            topicpartitions[i] = new TopicPartition(topicname, i);
        }
        for (i = 0; i < 10; ++i) {
            ProducerRecord record = new ProducerRecord(topicname, Integer.valueOf(i), (Object)key, (Object)value);
            futures[i] = producer.send(record);
        }
        producer.flush();
        for (Future future : futures) {
            future.get();
        }
        RebalanceCb callback = new RebalanceCb();
        ArrayList<String> subscribeList = new ArrayList<String>();
        subscribeList.add(topicname);
        consumer.subscribe(subscribeList, (ConsumerRebalanceListener)callback);
        callback.assignDone();
        try {
            Thread.sleep(200L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        consumer.poll(100L);
        consumer.poll(100L);
        for (int i2 = 0; i2 < 10; ++i2) {
            boolean committedException = false;
            OffsetAndMetadata offset = null;
            try {
                offset = consumer.committed(topicpartitions[i2]);
            }
            catch (Exception e) {
                committedException = true;
                System.out.println("committed exception: " + e);
            }
            Assert.assertTrue((committedException || offset == null || offset.offset() == 0L ? 1 : 0) != 0);
        }
    }

    public void testPhase2(KafkaConsumer consumer, String topicname, TopicPartition[] topicpartitions, Future[] futuresOld, Future[] futuresNew) throws Exception {
        int i;
        for (i = 0; i < 10; ++i) {
            OffsetAndMetadata offset = consumer.committed(topicpartitions[i]);
            Assert.assertTrue((offset.offset() == ((RecordMetadata)futuresOld[i].get()).offset() + 1L ? 1 : 0) != 0);
        }
        for (i = 0; i < 10; ++i) {
            ProducerRecord record = new ProducerRecord(topicname, Integer.valueOf(i), (Object)key, (Object)value);
            futuresNew[i] = producer.send(record);
        }
        producer.flush();
        for (Future future : futuresNew) {
            future.get();
        }
        for (int i2 = 0; i2 < 10; ++i2) {
            OffsetAndMetadata offset = consumer.committed(topicpartitions[i2]);
            Assert.assertTrue((offset.offset() == ((RecordMetadata)futuresOld[i2].get()).offset() + 1L ? 1 : 0) != 0);
        }
        try {
            Thread.sleep(200L);
        }
        catch (InterruptedException i2) {
            // empty catch block
        }
        consumer.poll(100L);
        consumer.poll(100L);
        for (int i3 = 0; i3 < 10; ++i3) {
            OffsetAndMetadata offset = consumer.committed(topicpartitions[i3]);
            Assert.assertTrue((offset.offset() == ((RecordMetadata)futuresOld[i3].get()).offset() + 1L ? 1 : 0) != 0);
        }
    }

    public void testPhase3(KafkaConsumer consumer, String topicname, TopicPartition[] topicpartitions, Future[] futures) throws Exception {
        for (int i = 0; i < 10; ++i) {
            OffsetAndMetadata offset = consumer.committed(topicpartitions[i]);
            Assert.assertTrue((offset.offset() == ((RecordMetadata)futures[i].get()).offset() + 1L ? 1 : 0) != 0);
        }
        consumer.unsubscribe();
    }

    public void commitSyncNoParam(KafkaConsumer consumer, String topicname) throws Exception {
        TopicPartition[] topicpartitions = new TopicPartition[10];
        Future[] futures = new Future[10];
        Future[] futures2 = new Future[10];
        this.testPhase1(consumer, topicname, topicpartitions, futures);
        consumer.commitSync();
        this.testPhase2(consumer, topicname, topicpartitions, futures, futures2);
        consumer.commitSync();
        this.testPhase3(consumer, topicname, topicpartitions, futures2);
    }

    public void commitSyncWithOffsets(KafkaConsumer consumer, String topicname) throws Exception {
        OffsetAndMetadata offset;
        int i;
        String[] tokens = topicname.split(":");
        madmin.createTopic(tokens[0], tokens[1], 10);
        TopicPartition[] topicpartitions = new TopicPartition[10];
        for (int i2 = 0; i2 < 10; ++i2) {
            topicpartitions[i2] = new TopicPartition(topicname, i2);
        }
        HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (int i3 = 0; i3 < 10; i3 += 2) {
            toCommit.put(topicpartitions[i3], new OffsetAndMetadata((long)(2 ^ i3)));
        }
        RebalanceCb callback = new RebalanceCb();
        ArrayList<String> subscribeList = new ArrayList<String>();
        subscribeList.add(topicname);
        consumer.subscribe(subscribeList, (ConsumerRebalanceListener)callback);
        callback.assignDone();
        consumer.commitSync(toCommit);
        for (i = 0; i < 10; ++i) {
            offset = consumer.committed(topicpartitions[i]);
            if (i % 2 == 0) {
                Assert.assertTrue((offset.offset() == (long)(2 ^ i) ? 1 : 0) != 0);
                continue;
            }
            Assert.assertTrue((offset == null ? 1 : 0) != 0);
        }
        toCommit = new HashMap();
        for (i = 0; i < 10; ++i) {
            toCommit.put(topicpartitions[i], new OffsetAndMetadata((long)(i + 1024)));
        }
        consumer.commitSync(toCommit);
        for (i = 0; i < 10; ++i) {
            offset = consumer.committed(topicpartitions[i]);
            Assert.assertTrue((offset.offset() == (long)(i + 1024) ? 1 : 0) != 0);
        }
        consumer.unsubscribe();
    }

    public void commitSyncWithOffsetsAndMetadata(KafkaConsumer consumer, String topicname) throws Exception {
        OffsetAndMetadata om;
        int i;
        String[] tokens = topicname.split(":");
        madmin.createTopic(tokens[0], tokens[1], 10);
        TopicPartition[] topicpartitions = new TopicPartition[10];
        for (int i2 = 0; i2 < 10; ++i2) {
            topicpartitions[i2] = new TopicPartition(topicname, i2);
        }
        HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (int i3 = 0; i3 < 10; i3 += 2) {
            toCommit.put(topicpartitions[i3], new OffsetAndMetadata((long)(2 ^ i3), "metadata" + i3));
        }
        RebalanceCb callback = new RebalanceCb();
        ArrayList<String> subscribeList = new ArrayList<String>();
        subscribeList.add(topicname);
        consumer.subscribe(subscribeList, (ConsumerRebalanceListener)callback);
        callback.assignDone();
        consumer.commitSync(toCommit);
        for (i = 0; i < 10; ++i) {
            om = consumer.committed(topicpartitions[i]);
            if (i % 2 == 0) {
                Assert.assertTrue((om.offset() == (long)(2 ^ i) ? 1 : 0) != 0);
                Assert.assertTrue((boolean)om.metadata().equals("metadata" + i));
                continue;
            }
            Assert.assertTrue((om == null ? 1 : 0) != 0);
        }
        toCommit = new HashMap();
        for (i = 0; i < 10; ++i) {
            toCommit.put(topicpartitions[i], new OffsetAndMetadata((long)(i + 1024), "metadata" + i));
        }
        consumer.commitSync(toCommit);
        for (i = 0; i < 10; ++i) {
            om = consumer.committed(topicpartitions[i]);
            Assert.assertTrue((om.offset() == (long)(i + 1024) ? 1 : 0) != 0);
            Assert.assertTrue((boolean)om.metadata().equals("metadata" + i));
        }
        consumer.unsubscribe();
    }

    public void commitAsyncNoParam(KafkaConsumer consumer, String topicname) throws Exception {
        TopicPartition[] topicpartitions = new TopicPartition[10];
        Future[] futures = new Future[10];
        Future[] futures2 = new Future[10];
        this.testPhase1(consumer, topicname, topicpartitions, futures);
        consumer.commitAsync();
        try {
            Thread.sleep(300L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        this.testPhase2(consumer, topicname, topicpartitions, futures, futures2);
        consumer.commitAsync();
        try {
            Thread.sleep(300L);
        }
        catch (Exception e) {
            System.out.println(e);
        }
        this.testPhase3(consumer, topicname, topicpartitions, futures2);
    }

    public void commitAsyncCallback(KafkaConsumer consumer, String topicname) throws Exception {
        TopicPartition[] topicpartitions = new TopicPartition[10];
        Future[] futures = new Future[10];
        Future[] futures2 = new Future[10];
        this.testPhase1(consumer, topicname, topicpartitions, futures);
        CommitCb ccb = new CommitCb();
        consumer.commitAsync((OffsetCommitCallback)ccb);
        ccb.commitDone();
        Assert.assertTrue((ccb.getException() == null ? 1 : 0) != 0);
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = ccb.getCommittedOffsets();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            Assert.assertTrue((boolean)entry.getValue().equals((Object)consumer.committed(entry.getKey())));
        }
        this.testPhase2(consumer, topicname, topicpartitions, futures, futures2);
        ccb = new CommitCb();
        consumer.commitAsync((OffsetCommitCallback)ccb);
        ccb.commitDone();
        Assert.assertTrue((ccb.getException() == null ? 1 : 0) != 0);
        committedOffsets = ccb.getCommittedOffsets();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            Assert.assertTrue((boolean)entry.getValue().equals((Object)consumer.committed(entry.getKey())));
        }
        this.testPhase3(consumer, topicname, topicpartitions, futures2);
    }

    public void commitAsyncCallbackOffsets(KafkaConsumer consumer, String topicname) throws Exception {
        OffsetAndMetadata offset;
        int i;
        String[] tokens = topicname.split(":");
        madmin.createTopic(tokens[0], tokens[1], 10);
        TopicPartition[] topicpartitions = new TopicPartition[10];
        for (int i2 = 0; i2 < 10; ++i2) {
            topicpartitions[i2] = new TopicPartition(topicname, i2);
        }
        HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (int i3 = 0; i3 < 10; i3 += 2) {
            toCommit.put(topicpartitions[i3], new OffsetAndMetadata((long)(2 ^ i3)));
        }
        RebalanceCb callback = new RebalanceCb();
        ArrayList<String> subscribeList = new ArrayList<String>();
        subscribeList.add(topicname);
        consumer.subscribe(subscribeList, (ConsumerRebalanceListener)callback);
        callback.assignDone();
        CommitCb ccb = new CommitCb();
        consumer.commitAsync(toCommit, (OffsetCommitCallback)ccb);
        ccb.commitDone();
        Assert.assertTrue((ccb.getException() == null ? 1 : 0) != 0);
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = ccb.getCommittedOffsets();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            Assert.assertTrue((boolean)entry.getValue().equals((Object)consumer.committed(entry.getKey())));
        }
        for (i = 0; i < 10; ++i) {
            offset = null;
            boolean exceptionCaught = false;
            try {
                offset = consumer.committed(topicpartitions[i]);
            }
            catch (Exception e) {
                exceptionCaught = true;
                System.out.println("commited exception: " + e);
            }
            if (i % 2 == 0) {
                Assert.assertTrue((offset.offset() == (long)(2 ^ i) ? 1 : 0) != 0);
                continue;
            }
            Assert.assertTrue((exceptionCaught || offset == null || offset.offset() == 0L ? 1 : 0) != 0);
        }
        toCommit = new HashMap();
        for (i = 0; i < 10; ++i) {
            toCommit.put(topicpartitions[i], new OffsetAndMetadata((long)(i + 1024)));
        }
        ccb = new CommitCb();
        consumer.commitAsync(toCommit, (OffsetCommitCallback)ccb);
        ccb.commitDone();
        Assert.assertTrue((ccb.getException() == null ? 1 : 0) != 0);
        committedOffsets = ccb.getCommittedOffsets();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            Assert.assertTrue((boolean)entry.getValue().equals((Object)consumer.committed(entry.getKey())));
        }
        for (int i4 = 0; i4 < 10; ++i4) {
            offset = consumer.committed(topicpartitions[i4]);
            Assert.assertTrue((offset.offset() == (long)(i4 + 1024) ? 1 : 0) != 0);
        }
        consumer.unsubscribe();
    }

    public void commitAsyncCallbackOffsetsAndMetadata(KafkaConsumer consumer, String topicname) throws Exception {
        OffsetAndMetadata om;
        int i;
        String[] tokens = topicname.split(":");
        madmin.createTopic(tokens[0], tokens[1], 10);
        TopicPartition[] topicpartitions = new TopicPartition[10];
        for (int i2 = 0; i2 < 10; ++i2) {
            topicpartitions[i2] = new TopicPartition(topicname, i2);
        }
        HashMap<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (int i3 = 0; i3 < 10; i3 += 2) {
            toCommit.put(topicpartitions[i3], new OffsetAndMetadata((long)(2 ^ i3), "metadata" + i3));
        }
        RebalanceCb callback = new RebalanceCb();
        ArrayList<String> subscribeList = new ArrayList<String>();
        subscribeList.add(topicname);
        consumer.subscribe(subscribeList, (ConsumerRebalanceListener)callback);
        callback.assignDone();
        CommitCb ccb = new CommitCb();
        consumer.commitAsync(toCommit, (OffsetCommitCallback)ccb);
        ccb.commitDone();
        Assert.assertTrue((ccb.getException() == null ? 1 : 0) != 0);
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = ccb.getCommittedOffsets();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            Assert.assertTrue((boolean)entry.getValue().equals((Object)consumer.committed(entry.getKey())));
        }
        for (i = 0; i < 10; ++i) {
            om = null;
            boolean exceptionCaught = false;
            try {
                om = consumer.committed(topicpartitions[i]);
            }
            catch (Exception e) {
                exceptionCaught = true;
                System.out.println("commited exception: " + e);
            }
            if (i % 2 == 0) {
                Assert.assertTrue((om.offset() == (long)(2 ^ i) ? 1 : 0) != 0);
                Assert.assertTrue((boolean)om.metadata().equals("metadata" + i));
                continue;
            }
            Assert.assertTrue((exceptionCaught || om == null || om.offset() == 0L ? 1 : 0) != 0);
        }
        toCommit = new HashMap();
        for (i = 0; i < 10; ++i) {
            toCommit.put(topicpartitions[i], new OffsetAndMetadata((long)(i + 1024), "metadata" + i));
        }
        ccb = new CommitCb();
        consumer.commitAsync(toCommit, (OffsetCommitCallback)ccb);
        ccb.commitDone();
        Assert.assertTrue((ccb.getException() == null ? 1 : 0) != 0);
        committedOffsets = ccb.getCommittedOffsets();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            Assert.assertTrue((boolean)entry.getValue().equals((Object)consumer.committed(entry.getKey())));
        }
        for (int i4 = 0; i4 < 10; ++i4) {
            om = consumer.committed(topicpartitions[i4]);
            Assert.assertTrue((om.offset() == (long)(i4 + 1024) ? 1 : 0) != 0);
            Assert.assertTrue((boolean)om.metadata().equals("metadata" + i4));
        }
        consumer.unsubscribe();
    }

    static {
        value = new byte[200];
        key = "abc".getBytes();
    }

    public final class CommitCb
    implements OffsetCommitCallback {
        private boolean committed = false;
        private Map<TopicPartition, OffsetAndMetadata> committedOffsets = null;
        private Exception committedException = null;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            CommitCb commitCb = this;
            synchronized (commitCb) {
                this.committedOffsets = offsets;
                this.committedException = exception;
                this.committed = true;
                this.notifyAll();
            }
        }

        public synchronized void commitDone() {
            while (!this.committed) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
        }

        public Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets() {
            return this.committedOffsets;
        }

        public Exception getException() {
            return this.committedException;
        }
    }

    public final class RebalanceCb
    implements ConsumerRebalanceListener {
        private boolean revoked = false;
        private boolean assigned = false;

        public synchronized void clear() {
            this.revoked = false;
            this.assigned = false;
        }

        public synchronized void revokeDone() {
            while (!this.revoked) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
            this.revoked = false;
        }

        public synchronized void assignDone() {
            while (!this.assigned) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
            this.assigned = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            RebalanceCb rebalanceCb = this;
            synchronized (rebalanceCb) {
                this.assigned = true;
                this.notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            RebalanceCb rebalanceCb = this;
            synchronized (rebalanceCb) {
                this.revoked = true;
                this.notifyAll();
            }
        }
    }
}

