/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.CursorInfo;
import com.mapr.streams.impl.admin.MarlinAdminImpl;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Assert;

public class Listener
implements Runnable {
    public static String topicIntial = new String("topic");
    private Configuration conf;
    private Admin madmin;
    protected String streamName;
    protected int numStreams = 2;
    private int numTopics = 2;
    private int numSlowTopics = 2;
    private int numPartitions = 4;
    private int numExpectedMsgs = 1000;
    private int maxPartitionFetchSize = 65536;
    private int maxFetchSize = 131072;
    private int maxPollRecords = Integer.MAX_VALUE;
    protected boolean verifyKeys = true;
    protected boolean verifyHeaderKey = true;
    protected boolean verifyHeaderVal = true;
    protected boolean keysInOrder = false;
    private boolean allowDuplicateKeys = false;
    protected boolean isTracingEnabled = false;
    private boolean printStats = true;
    private TED_ACTION action;
    private boolean topicSubscription = false;
    private String groupId = null;
    private Hashtable<PartitionInfo, Integer> partitionSeqMap;
    private Hashtable<PartitionInfo, boolean[]> partitionBArrayMap;
    protected boolean status;

    public static void usage() {
        System.err.println("Listener -path <stream-intialname> [-nstreams <num streams> -ntopics <num topics> -nslowtopics <num topics> -npart <numpartitions per topic> -nmsgs <num messages per topicfeed>]");
        System.exit(1);
    }

    public static void main(String[] args) throws Exception {
        String streamName = null;
        int numStreams = 2;
        int numTopics = 2;
        int numSlowTopics = 2;
        int numPartitions = 4;
        int numExpectedMsgs = 1000;
        boolean verifyKeys = true;
        boolean keysInOrder = true;
        boolean allowDuplicateKeys = false;
        boolean isTracingEnabled = false;
        TED_ACTION action = TED_ACTION.kNone;
        boolean topicSubscription = false;
        String groupId = null;
        int maxPartitionFetchSize = 65536;
        int maxFetchSize = 131072;
        int maxPollRecords = Integer.MAX_VALUE;
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                streamName = args[i];
                continue;
            }
            if (args[i].equals("-nstreams")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                numStreams = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-ntopics")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                numTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-nslowtopics")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                numSlowTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-npart")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                numPartitions = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-nmsgs")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                numExpectedMsgs = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-verifykeys")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                verifyKeys = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-keysinorder")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                keysInOrder = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-debug")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                isTracingEnabled = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-allowduplicates")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                allowDuplicateKeys = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-tedaction")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                action = TED_ACTION.parseTedAction(Integer.parseInt(args[i]));
                System.out.println("Setting tedaction to " + action);
                continue;
            }
            if (args[i].equals("-topicsubscr")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                topicSubscription = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-groupid")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                groupId = args[i];
                continue;
            }
            if (args[i].equals("-maxpartitionfetchsize")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                maxPartitionFetchSize = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-maxfetchsize")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                maxFetchSize = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-maxpollrecords")) {
                if (++i >= args.length) {
                    Listener.usage();
                }
                maxPollRecords = Integer.parseInt(args[i]);
                continue;
            }
            Listener.usage();
        }
        Listener lp = new Listener(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, verifyKeys, keysInOrder, allowDuplicateKeys, isTracingEnabled, true, action, topicSubscription, groupId, maxPartitionFetchSize, maxFetchSize, maxPollRecords);
        Thread lt = new Thread(lp);
        lt.start();
        lt.join();
        if (!lp.status) {
            System.out.println("Listener test failed.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static boolean runLGTest(String streamName, int numStreams, int numTopics, int numPartitions, boolean isTracingEnabled, String groupId) {
        KafkaConsumer l1 = null;
        KafkaConsumer l2 = null;
        KafkaConsumer l3 = null;
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        ArrayList<CallSite> topicList = new ArrayList<CallSite>();
        try {
            String[] tokens;
            Admin madmin = null;
            Configuration conf = new Configuration();
            try {
                madmin = Streams.newAdmin((Configuration)conf);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            int totalNumTopics = numTopics;
            for (int sIdx = 0; sIdx < numStreams; ++sIdx) {
                for (int i = 0; i < totalNumTopics; ++i) {
                    String streamTopicName = streamName + sIdx + ":" + topicIntial + i;
                    topicList.add((CallSite)((Object)streamTopicName));
                    for (int j = 0; j < numPartitions; ++j) {
                        partitions.add(new TopicPartition(streamTopicName, j));
                    }
                }
            }
            Properties props = new Properties();
            props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("auto.offset.reset", "earliest");
            props.put("max.partition.fetch.bytes", (Object)65536);
            props.put("fetch.max.bytes", (Object)65536);
            props.put("max.poll.records", (Object)Integer.MAX_VALUE);
            if (groupId != null) {
                props.put("group.id", groupId);
            }
            ByteArrayDeserializer keyD = new ByteArrayDeserializer();
            ByteArrayDeserializer valueD = new ByteArrayDeserializer();
            l1 = new KafkaConsumer(props, (Deserializer)keyD, (Deserializer)valueD);
            RebCb cb = new RebCb(partitions);
            cb.addConsumer((Consumer<?, ?>)l1);
            l1.subscribe(topicList, (ConsumerRebalanceListener)cb);
            System.out.println("Creating topics");
            for (String string : topicList) {
                try {
                    String[] tokens2 = string.split(":");
                    madmin.createTopic(tokens2[0], tokens2[1], numPartitions);
                }
                catch (Exception e) {
                    System.out.println("Topic create failed");
                }
            }
            System.out.println("Waiting for topic subscriptions");
            cb.WaitForAllSubscribed();
            Set subscribedl1 = l1.assignment();
            Set set = l1.subscription();
            System.out.println("All subscribed: L1, partitions " + subscribedl1.size() + ", topics " + set.size());
            if (subscribedl1.size() != numStreams * totalNumTopics * numPartitions) {
                System.out.println("All subscribed through L1, expected " + numStreams * totalNumTopics * numPartitions + " but got " + subscribedl1.size());
                boolean e = false;
                return e;
            }
            if (set.size() != numStreams * totalNumTopics) {
                System.out.println("All subscribed through L1, expected " + numStreams * totalNumTopics + " but got " + set.size());
                boolean e = false;
                return e;
            }
            l2 = new KafkaConsumer(props, (Deserializer)keyD, (Deserializer)valueD);
            cb.addConsumer((Consumer<?, ?>)l2);
            l2.subscribe(topicList, (ConsumerRebalanceListener)cb);
            cb.WaitForRevoke();
            cb.WaitForAllSubscribed();
            subscribedl1 = l1.assignment();
            Set set2 = l1.subscription();
            Set subscribedl2 = l2.assignment();
            Set subscribedTopicsl2 = l2.subscription();
            System.out.println("Two listeners: L1 (partitions " + subscribedl1.size() + ", topics " + set2.size() + "), L2 (partitiosn " + subscribedl2.size() + ", topics " + subscribedTopicsl2.size() + ")");
            if (subscribedl1.size() + subscribedl2.size() != numStreams * totalNumTopics * numPartitions) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics * numPartitions + " found:" + (subscribedl1.size() + subscribedl2.size()));
                boolean bl = false;
                return bl;
            }
            HashSet totalSubscription = new HashSet();
            totalSubscription.addAll(set2);
            totalSubscription.addAll(subscribedTopicsl2);
            if (totalSubscription.size() != numStreams * totalNumTopics) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics + " found:" + totalSubscription.size());
                boolean bl = false;
                return bl;
            }
            int maxPartitions = (numPartitions / 2 + 1) * (numStreams * numTopics);
            int minPartitions = numPartitions / 2 * (numStreams * numTopics);
            if (subscribedl1.size() < minPartitions || subscribedl1.size() > maxPartitions || subscribedl2.size() < minPartitions || subscribedl2.size() > maxPartitions) {
                System.out.println("l1 subscription size " + subscribedl1.size());
                System.out.println("l2 subscription size " + subscribedl2.size());
                boolean bl = false;
                return bl;
            }
            numPartitions *= 2;
            ArrayList<TopicPartition> newPartitions = new ArrayList<TopicPartition>();
            for (int sIdx = 0; sIdx < numStreams; ++sIdx) {
                for (int i = 0; i < totalNumTopics; ++i) {
                    String streamTopicName = streamName + sIdx + ":" + topicIntial + i;
                    for (int j = 0; j < numPartitions; ++j) {
                        newPartitions.add(new TopicPartition(streamTopicName, j));
                    }
                }
            }
            cb.Reallocate(newPartitions);
            for (String string : topicList) {
                try {
                    String[] tokens3 = string.split(":");
                    madmin.editTopic(tokens3[0], tokens3[1], numPartitions);
                }
                catch (Exception e) {
                    System.out.println("Topic edit failed");
                }
            }
            cb.WaitForAllSubscribed();
            System.out.println("Checking subscriptions after partition change");
            subscribedl1 = l1.assignment();
            Set set3 = l1.subscription();
            subscribedl2 = l2.assignment();
            subscribedTopicsl2 = l2.subscription();
            System.out.println("Partition change: L1 (partitions " + subscribedl1.size() + ", topics " + set3.size() + "), L2 (partitiosn " + subscribedl2.size() + ", topics " + subscribedTopicsl2.size() + ")");
            if (subscribedl1.size() + subscribedl2.size() != numStreams * totalNumTopics * numPartitions) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics * numPartitions + " found:" + (subscribedl1.size() + subscribedl2.size()));
                boolean sIdx = false;
                return sIdx;
            }
            totalSubscription.clear();
            totalSubscription.addAll(set3);
            totalSubscription.addAll(subscribedTopicsl2);
            if (totalSubscription.size() != numStreams * totalNumTopics) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics + " found:" + totalSubscription.size());
                boolean sIdx = false;
                return sIdx;
            }
            maxPartitions = (numPartitions / 2 + 1) * (numStreams * numTopics);
            minPartitions = numPartitions / 2 * (numStreams * numTopics);
            if (subscribedl1.size() < minPartitions || subscribedl1.size() > maxPartitions || subscribedl2.size() < minPartitions || subscribedl2.size() > maxPartitions) {
                System.out.println("l1 subscription size " + subscribedl1.size());
                System.out.println("l2 subscription size " + subscribedl2.size());
                boolean sIdx = false;
                return sIdx;
            }
            l3 = new KafkaConsumer(props, (Deserializer)keyD, (Deserializer)valueD);
            cb.addConsumer((Consumer<?, ?>)l3);
            l3.subscribe(topicList, (ConsumerRebalanceListener)cb);
            cb.WaitForRevoke();
            cb.WaitForAllSubscribed();
            subscribedl1 = l1.assignment();
            subscribedl2 = l2.assignment();
            Set subscribedl3 = l3.assignment();
            Set set4 = l1.subscription();
            subscribedTopicsl2 = l2.subscription();
            Set set5 = l3.subscription();
            System.out.println("New listener L3 added: L1 (partitions " + subscribedl1.size() + ", topics " + set4.size() + "), L2 (partitiosn " + subscribedl2.size() + ", topics " + subscribedTopicsl2.size() + "), L3 (partitions " + subscribedl3.size() + ", topics " + set5.size() + ")");
            if (subscribedl1.size() + subscribedl2.size() + subscribedl3.size() != numStreams * totalNumTopics * numPartitions) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics * numPartitions + " found:" + (subscribedl1.size() + subscribedl2.size() + subscribedl3.size()));
                boolean bl = false;
                return bl;
            }
            totalSubscription.clear();
            totalSubscription.addAll(set4);
            totalSubscription.addAll(subscribedTopicsl2);
            totalSubscription.addAll(set5);
            if (totalSubscription.size() != numStreams * totalNumTopics) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics + " found:" + totalSubscription.size());
                boolean bl = false;
                return bl;
            }
            maxPartitions = (numPartitions / 3 + 1) * (numStreams * numTopics);
            minPartitions = numPartitions / 3 * (numStreams * numTopics);
            if (subscribedl1.size() < minPartitions || subscribedl1.size() > maxPartitions || subscribedl2.size() < minPartitions || subscribedl2.size() > maxPartitions || subscribedl3.size() < minPartitions || subscribedl3.size() > maxPartitions) {
                System.out.println("l1 subscription size " + subscribedl1.size());
                System.out.println("l2 subscription size " + subscribedl2.size());
                System.out.println("l3 subscription size " + subscribedl3.size());
                boolean bl = false;
                return bl;
            }
            l2.unsubscribe();
            cb.WaitForRevoke();
            cb.WaitForAllSubscribed();
            subscribedl1 = l1.assignment();
            subscribedl2 = l2.assignment();
            subscribedl3 = l3.assignment();
            Set set6 = l1.subscription();
            subscribedTopicsl2 = l2.subscription();
            Set set7 = l3.subscription();
            System.out.println("L2 unsubscribed: L1 (partitions " + subscribedl1.size() + ", topics " + set6.size() + "), L2 (partitiosn " + subscribedl2.size() + ", topics " + subscribedTopicsl2.size() + "), L3 (partitions " + subscribedl3.size() + ", topics " + set7.size() + ")");
            if (subscribedl1.size() + subscribedl3.size() != numStreams * totalNumTopics * numPartitions) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics * numPartitions + " found:" + (subscribedl1.size() + subscribedl3.size()));
                boolean bl = false;
                return bl;
            }
            totalSubscription.clear();
            totalSubscription.addAll(set6);
            totalSubscription.addAll(subscribedTopicsl2);
            totalSubscription.addAll(set7);
            if (totalSubscription.size() != numStreams * totalNumTopics) {
                System.out.println("Expected total subscriptions " + numStreams * totalNumTopics + " found:" + totalSubscription.size());
                boolean bl = false;
                return bl;
            }
            maxPartitions = (numPartitions / 2 + 1) * (numStreams * numTopics);
            minPartitions = numPartitions / 2 * (numStreams * numTopics);
            if (subscribedl1.size() < minPartitions || subscribedl1.size() > maxPartitions || subscribedl3.size() < minPartitions || subscribedl3.size() > maxPartitions) {
                System.out.println("l1 subscription size " + subscribedl1.size());
                System.out.println("l2 subscription size " + subscribedl3.size());
                boolean bl = false;
                return bl;
            }
            System.out.println("Delete all the topics and make sure all the partitions get unsubscribed");
            for (String string : topicList) {
                try {
                    tokens = string.split(":");
                    madmin.deleteTopic(tokens[0], tokens[1]);
                }
                catch (Exception e) {
                    System.out.println("Topic delete failed");
                }
            }
            cb.WaitForAllUnsubscribed();
            System.out.println("Recreating all topics");
            for (String string : topicList) {
                try {
                    tokens = string.split(":");
                    madmin.createTopic(tokens[0], tokens[1], numPartitions);
                }
                catch (Exception e) {
                    System.out.println("Topic recreate failed");
                }
            }
            System.out.println("Waiting for all subscriptions to come back");
            cb.WaitForAllSubscribed();
            System.out.println("Unsubscribing all");
            l1.unsubscribe();
            l3.unsubscribe();
            cb.WaitForAllUnsubscribed();
        }
        finally {
            if (l1 != null) {
                l1.close();
            }
        }
        return true;
    }

    public static boolean runTestWithPollOptions(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, int maxPartitionFetchSize, int maxFetchSize, int maxPollRecords) throws IOException {
        Listener lp = new Listener(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, true, true, false, false, false, TED_ACTION.kNone, false, null, maxPartitionFetchSize, maxFetchSize, maxPollRecords, true, true);
        lp.runWithException();
        return lp.status;
    }

    public static boolean runTest(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs) throws IOException {
        Listener lp = new Listener(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, true, true, false, false, false, TED_ACTION.kNone, false, null, 65536, 131072, Integer.MAX_VALUE, true, true);
        lp.runWithException();
        return lp.status;
    }

    public static boolean runTest(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, String lgGrp) throws IOException {
        Listener lp = new Listener(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, true, true, false, false, false, TED_ACTION.kNone, false, lgGrp, 65536, 131072, Integer.MAX_VALUE, true, true);
        lp.runWithException();
        return lp.status;
    }

    public static boolean runTest(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, TED_ACTION action) throws IOException {
        Listener lp = new Listener(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, true, true, false, false, false, action, false, null, 65536, 131072, Integer.MAX_VALUE, true, true);
        lp.runWithException();
        return lp.status;
    }

    public static boolean runTest(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, boolean topicSubscription, boolean keysInOrder, String groupId) throws IOException {
        Listener lp = new Listener(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, true, keysInOrder, false, false, false, TED_ACTION.kNone, topicSubscription, groupId, 65536, 131072, Integer.MAX_VALUE, true, true);
        lp.runWithException();
        return lp.status;
    }

    public Listener(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, boolean verifyKeys, boolean keysInOrder, boolean allowDuplicateKeys, boolean isTracingEnabled, boolean printStats, TED_ACTION action, boolean topicSubscription, String groupId, int maxPartitionFetchSize, int maxFetchSize, int maxPollRecords, boolean verifyHdrKey, boolean verifyHdrVal) {
        this.streamName = streamName;
        this.numStreams = numStreams;
        this.numTopics = numTopics;
        this.numSlowTopics = numSlowTopics;
        this.numPartitions = numPartitions;
        this.numExpectedMsgs = numExpectedMsgs;
        this.verifyKeys = verifyKeys;
        this.keysInOrder = keysInOrder;
        this.allowDuplicateKeys = allowDuplicateKeys;
        this.isTracingEnabled = isTracingEnabled;
        this.printStats = printStats;
        this.action = action;
        this.topicSubscription = topicSubscription;
        this.groupId = groupId;
        this.maxPartitionFetchSize = maxPartitionFetchSize;
        this.maxFetchSize = maxFetchSize;
        this.maxPollRecords = maxPollRecords;
        this.verifyHeaderKey = verifyHdrKey;
        this.verifyHeaderVal = verifyHdrVal;
        if (Boolean.getBoolean(System.getProperty("DISABLE_V10_TESTS"))) {
            this.verifyHeaderKey = false;
            this.verifyHeaderVal = false;
        }
        this.conf = new Configuration();
        try {
            this.madmin = Streams.newAdmin((Configuration)this.conf);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Listener(String streamName, int numStreams, int numTopics, int numSlowTopics, int numPartitions, int numExpectedMsgs, boolean verifyKeys, boolean keysInOrder, boolean allowDuplicateKeys, boolean isTracingEnabled, boolean printStats, TED_ACTION action, boolean topicSubscription, String groupId, int maxPartitionFetchSize, int maxFetchSize, int maxPollRecords) {
        this(streamName, numStreams, numTopics, numSlowTopics, numPartitions, numExpectedMsgs, verifyKeys, keysInOrder, allowDuplicateKeys, isTracingEnabled, printStats, action, topicSubscription, groupId, maxPartitionFetchSize, maxFetchSize, maxPollRecords, false, false);
    }

    @Override
    public void run() {
        try {
            this.runWithException();
        }
        catch (IOException e) {
            System.out.println(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runWithException() throws IOException {
        KafkaConsumer listener = null;
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        ArrayList<CallSite> topicList = new ArrayList<CallSite>();
        Thread tt = null;
        try {
            String[] tokens;
            int sleepMaxTries;
            Set subscribed;
            Set subscribedTopics;
            int pollsWithMissingMsgs = 0;
            this.status = false;
            if (this.streamName == null || this.streamName.length() == 0) {
                System.err.println("stream name cannot be empty.");
                Listener.usage();
            }
            if (this.keysInOrder) {
                this.partitionSeqMap = new Hashtable();
            } else {
                this.partitionBArrayMap = new Hashtable();
            }
            int totalNumTopics = this.numTopics + this.numSlowTopics;
            for (int sIdx = 0; sIdx < this.numStreams; ++sIdx) {
                for (int i = 0; i < totalNumTopics; ++i) {
                    String streamTopicName = this.streamName + sIdx + ":" + topicIntial + i;
                    topicList.add((CallSite)((Object)streamTopicName));
                    for (int j = 0; j < this.numPartitions; ++j) {
                        partitions.add(new TopicPartition(streamTopicName, j));
                        if (!this.isTracingEnabled) continue;
                        System.out.println("Creating a subscription for " + streamTopicName + " feed:" + j);
                    }
                }
            }
            Properties props = new Properties();
            Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
            props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("max.partition.fetch.bytes", (Object)this.maxPartitionFetchSize);
            props.put("fetch.max.bytes", (Object)this.maxFetchSize);
            props.put("max.poll.records", (Object)this.maxPollRecords);
            props.put(cdef.getMetadataMaxAge(), (Object)100);
            if (this.groupId != null) {
                props.put("group.id", this.groupId);
            }
            ByteArrayDeserializer keyD = new ByteArrayDeserializer();
            ByteArrayDeserializer valueD = new ByteArrayDeserializer();
            listener = new KafkaConsumer(props, (Deserializer)keyD, (Deserializer)valueD);
            RebCb cb = new RebCb(partitions);
            cb.addConsumer((Consumer<?, ?>)listener);
            if (this.topicSubscription) {
                if (this.groupId != null) {
                    listener.subscribe(topicList, (ConsumerRebalanceListener)cb);
                    cb.WaitForAllSubscribed();
                } else {
                    listener.subscribe(topicList);
                }
                subscribedTopics = listener.subscription();
                System.out.println("Using subscribe() api, got " + subscribedTopics.size() + " from subscription()");
                if (subscribedTopics.size() != this.numStreams * totalNumTopics) {
                    throw new IOException("Subscription failed. Expected:" + this.numStreams * totalNumTopics + " found:" + subscribedTopics.size());
                }
            } else {
                listener.assign(partitions);
                listener.assign(partitions);
                subscribedTopics = listener.subscription();
                System.out.println("Using assign() api, got " + subscribedTopics.size() + " from subscription()");
                if (!subscribedTopics.isEmpty()) {
                    throw new IOException("Subscription failed.  Should not have topic subscription when using assign() api");
                }
            }
            if ((subscribed = listener.assignment()).size() != this.numStreams * totalNumTopics * this.numPartitions) {
                throw new IOException("Subscription failed. Expected:" + this.numStreams * totalNumTopics * this.numPartitions + " found:" + subscribed.size());
            }
            System.out.println("Subscription successful:" + subscribed.size());
            for (TopicPartition p : subscribed) {
                long position = listener.position(p);
                if (this.isTracingEnabled) {
                    System.out.println("Subscribed to " + p.topic() + " partition:" + p.partition() + " position:" + position + "(" + this.numExpectedMsgs + ")");
                }
                listener.seekToBeginning(new TopicPartition[]{p});
            }
            PerfStats stats = new PerfStats();
            if (this.action != TED_ACTION.kNone) {
                TedThread ttp = new TedThread(this.action, this);
                tt = new Thread(ttp);
                tt.start();
            }
            do {
                ConsumerRecords recs;
                if (this.isTracingEnabled) {
                    System.out.println("pollsWithMissingMsgs " + pollsWithMissingMsgs);
                }
                pollsWithMissingMsgs = (recs = listener.poll(1000L)).count() == 0 ? ++pollsWithMissingMsgs : 0;
                this.VerifyAndAddStats((ConsumerRecords<byte[], byte[]>)recs, stats);
            } while (pollsWithMissingMsgs < 2);
            if (this.verifyKeys) {
                this.VerifyPollingEnd();
            }
            if (this.printStats) {
                stats.printReport();
            }
            this.status = true;
            System.out.println("committing offsets");
            listener.commitSync();
            System.out.println("returning from commit offset");
            if (this.groupId != null && totalNumTopics < 10) {
                System.out.println("verifying list cursors");
                for (int sIdx = 0; sIdx < this.numStreams; ++sIdx) {
                    for (int i = 0; i < totalNumTopics; ++i) {
                        MarlinAdminImpl admin = (MarlinAdminImpl)this.madmin;
                        List ci = admin.listCursors(this.streamName + sIdx, this.groupId, topicIntial + i, -1);
                        if (ci.size() != this.numPartitions) {
                            throw new IOException("Expected " + this.numPartitions + " cursorInfo structure returned " + ci.size());
                        }
                        for (CursorInfo cursorInfo : ci) {
                            TopicPartition p = (TopicPartition)partitions.get(sIdx * totalNumTopics * this.numPartitions + i * this.numPartitions + cursorInfo.feedId());
                            long committedOffset = listener.committed(p).offset();
                            if (committedOffset == cursorInfo.cursor()) continue;
                            throw new IOException("read offset from disk:" + cursorInfo.cursor() + " queryied:" + committedOffset);
                        }
                    }
                }
                System.out.println("verifying list cursors done");
            }
            long[] committed = new long[subscribed.size()];
            int i = 0;
            for (Object p : subscribed) {
                committed[i] = listener.committed((TopicPartition)p).offset();
                long l = listener.position((TopicPartition)p);
                if (l != committed[i]) {
                    System.out.println("Postion is :" + l + " committed:" + committed[i]);
                    throw new IOException("position is not what was expected");
                }
                listener.seekToBeginning(new TopicPartition[]{p});
                if (listener.position((TopicPartition)p) != 0L) {
                    throw new IOException("seekToBeginning failed");
                }
                ++i;
            }
            TopicPartition[] partsToSeek = new TopicPartition[partitions.size()];
            partitions.toArray(partsToSeek);
            listener.seekToEnd(partsToSeek);
            i = 0;
            for (TopicPartition topicPartition : subscribed) {
                long l = listener.position(topicPartition);
                if (l >= committed[i++]) continue;
                System.out.println("Postion after seekToEnd is :" + l + " committed:" + committed[i]);
                throw new IOException("position is not what was expected, after seekToEnd");
            }
            for (String string : topicList) {
                String[] stringArray = string.split(":");
                this.madmin.deleteTopic(stringArray[0], stringArray[1]);
            }
            for (sleepMaxTries = 5; sleepMaxTries > 0; --sleepMaxTries) {
                try {
                    Thread.sleep(150L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                subscribed = listener.assignment();
                if (subscribed.size() == 0) break;
            }
            if ((subscribed = listener.assignment()).size() != 0) {
                throw new IOException("Found subscription after topic delete");
            }
            for (String string : topicList) {
                tokens = string.split(":");
                this.madmin.createTopic(tokens[0], tokens[1], this.numPartitions);
            }
            for (sleepMaxTries = 5; sleepMaxTries > 0; --sleepMaxTries) {
                try {
                    Thread.sleep(150L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                subscribed = listener.assignment();
                if (subscribed.size() == this.numStreams * totalNumTopics * this.numPartitions) break;
            }
            if ((subscribed = listener.assignment()).size() != this.numStreams * totalNumTopics * this.numPartitions) {
                throw new IOException("Found subscription after topic delete");
            }
            for (TopicPartition topicPartition : subscribed) {
                try {
                    OffsetAndMetadata offset = listener.committed(topicPartition);
                    long c = 0L;
                    if (offset != null) {
                        c = listener.committed(topicPartition).offset();
                    }
                    if (c == 0L) continue;
                    throw new IOException("Cursor found for a recreated topic");
                }
                catch (KafkaException offset) {
                }
            }
            for (String string : topicList) {
                tokens = string.split(":");
                this.madmin.deleteTopic(tokens[0], tokens[1]);
            }
            if (this.topicSubscription && this.groupId != null) {
                cb.WaitForAllUnsubscribed();
            }
            for (String string : topicList) {
                tokens = string.split(":");
                this.madmin.createTopic(tokens[0], tokens[1], this.numPartitions / 2);
            }
            try {
                for (sleepMaxTries = 5; sleepMaxTries > 0; --sleepMaxTries) {
                    try {
                        Thread.sleep(150L);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    subscribed = listener.assignment();
                    if (subscribed.size() != this.numStreams * totalNumTopics * (this.numPartitions / 2)) {
                        continue;
                    }
                    break;
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            subscribed = listener.assignment();
            if (subscribed.size() != this.numStreams * totalNumTopics * (this.numPartitions / 2)) {
                throw new IOException("Found " + subscribed.size() + " subscription after topic delete Expected:" + this.numStreams * totalNumTopics * (this.numPartitions / 2));
            }
            System.out.println("Unsubscribing");
            if (this.topicSubscription) {
                listener.unsubscribe();
                if (this.groupId != null) {
                    cb.WaitForAllUnsubscribed();
                }
            } else {
                listener.unsubscribe();
                Set set = listener.assignment();
                if (set.size() != 0) {
                    throw new IOException("Unsubscribe failed");
                }
            }
            System.out.println("The test completed successfully");
        }
        finally {
            if (listener != null) {
                listener.close();
            }
            try {
                if (tt != null) {
                    tt.join();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void DeleteStream() {
        try {
            this.madmin.deleteStream(this.streamName + "0");
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void ChangePerm() {
        try {
            StreamDescriptor sdesc = Streams.newStreamDescriptor();
            System.out.println("Change listen perms for " + this.streamName + "0");
            sdesc.setConsumePerms("u:0 | g:0");
            this.madmin.editStream(this.streamName + "0", sdesc);
            sdesc = Streams.newStreamDescriptor();
            System.out.println("Change admin perms for " + this.streamName + "0");
            sdesc.setAdminPerms("u:0 | g:0");
            this.madmin.editStream(this.streamName + "0", sdesc);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void VerifyAndAddStats(ConsumerRecords<byte[], byte[]> recs, PerfStats stats) throws IOException {
        Iterator iter = recs.iterator();
        long numBytes = 0L;
        long recSize = 0L;
        Hashtable<PartitionInfo, Long> partitionMsgBytesMap = null;
        Hashtable<PartitionInfo, Long> partitionMsgsMap = null;
        int recsCount = recs.count();
        if (this.maxPollRecords != Integer.MAX_VALUE && recsCount > this.maxPollRecords) {
            System.out.println("Total records returned by poll " + recsCount + " is greater than " + this.maxPollRecords);
            throw new IOException("Total records returned by poll " + recsCount + " is greater than " + this.maxPollRecords);
        }
        if (this.verifyKeys) {
            partitionMsgBytesMap = new Hashtable<PartitionInfo, Long>();
            partitionMsgsMap = new Hashtable<PartitionInfo, Long>();
        }
        while (iter.hasNext()) {
            ConsumerRecord rec = (ConsumerRecord)iter.next();
            numBytes = this.getBytesAndVerify(numBytes, partitionMsgBytesMap, partitionMsgsMap, (ConsumerRecord<byte[], byte[]>)rec);
        }
        if (this.verifyKeys) {
            Long totalSize = 0L;
            for (PartitionInfo key : partitionMsgBytesMap.keySet()) {
                Long size = partitionMsgBytesMap.get(key);
                Long numMsgs = partitionMsgsMap.get(key);
                totalSize = totalSize + size;
                if (numMsgs <= 1L || size <= (long)this.maxPartitionFetchSize) continue;
                System.out.println("Total partition msgbytes fetched " + size + " is greater than " + this.maxPartitionFetchSize);
                throw new IOException("Total partition msgbytes fetched " + size + " is greater than " + this.maxPartitionFetchSize);
            }
            if (recsCount > 1 && totalSize > (long)this.maxFetchSize) {
                System.out.println("Total msgbytes fetched " + totalSize + " is greater than " + this.maxFetchSize);
                throw new IOException("Total msgbytes fetched " + totalSize + " is greater than " + this.maxFetchSize);
            }
        }
        stats.report(numBytes, recs.count());
    }

    protected long getHeadersSize(ConsumerRecord<byte[], byte[]> rec) throws IOException {
        return 0L;
    }

    private long getBytesAndVerify(long numBytes, Hashtable<PartitionInfo, Long> partitionMsgBytesMap, Hashtable<PartitionInfo, Long> partitionMsgsMap, ConsumerRecord<byte[], byte[]> rec) throws IOException, IOException, IOException, IOException, IOException, IOException, IOException {
        if (this.isTracingEnabled) {
            System.out.println(rec);
        }
        byte[] key = null;
        byte[] value = null;
        try {
            key = (byte[])rec.key();
            value = (byte[])rec.value();
        }
        catch (Exception e) {
            throw new IOException("ConsumerRecord Exception");
        }
        long recSize = key.length + value.length;
        numBytes += this.getHeadersSize(rec);
        if (this.verifyKeys) {
            String keyStr = new String(key, "UTF-8");
            String[] tokens = keyStr.split(":");
            if (tokens.length != 4) {
                throw new IOException("Key " + keyStr + " not of correct format");
            }
            int partition = Integer.parseInt(tokens[2]);
            int seq = Integer.parseInt(tokens[3]);
            if (this.isTracingEnabled) {
                System.out.println("Key " + keyStr + " ntokens " + tokens.length);
            }
            if (tokens[0].length() != this.streamName.length() + 1 || !tokens[0].startsWith(this.streamName) || Integer.parseInt(tokens[0].substring(this.streamName.length())) >= this.numStreams) {
                throw new IOException("StreamName in key " + tokens[0] + " didnot match intial " + this.streamName + " and numstreams " + this.numStreams);
            }
            String recTopic = rec.topic();
            String[] recTopicTokens = recTopic.split(":");
            if (recTopicTokens.length != 2 || !tokens[1].equals(recTopicTokens[1])) {
                throw new IOException("Topic in key " + tokens[1] + " mismatched. Expected " + recTopicTokens[1]);
            }
            if (partition != rec.partition()) {
                throw new IOException("Partition in key " + partition + " mismatched. Expected " + rec.partition());
            }
            if (this.keysInOrder) {
                this.VerifyPollingOrderedKey(tokens[0], tokens[1], partition, seq);
            } else {
                this.VerifyPollingUnorderedKey(tokens[0], tokens[1], partition, seq);
            }
            this.AddPerPartitonRecs(partitionMsgBytesMap, partitionMsgsMap, tokens[0], tokens[1], partition, recSize);
        }
        this.verifyHeaders(rec);
        return numBytes;
    }

    protected void verifyHeaders(ConsumerRecord<byte[], byte[]> rec) throws IOException {
    }

    protected void VerifyPollingOrderedKey(String stName, String tpName, int partition, int seq) throws IOException {
        PartitionInfo pinfo = new PartitionInfo(stName, tpName, partition);
        Integer mappedSeq = this.partitionSeqMap.get(pinfo);
        int expSeq = 0;
        if (mappedSeq != null) {
            expSeq = mappedSeq + 1;
        }
        if (seq != expSeq) {
            throw new IOException("Current Seq " + seq + " for Stream " + stName + " Topic " + tpName + " partition " + partition + " mismatched. Expected " + expSeq);
        }
        this.partitionSeqMap.put(pinfo, seq);
    }

    protected void AddPerPartitonRecs(Hashtable<PartitionInfo, Long> pMap, Hashtable<PartitionInfo, Long> pMsgMap, String stName, String tpName, int partition, long recSize) {
        PartitionInfo pinfo = new PartitionInfo(stName, tpName, partition);
        Long oldSize = pMap.get(pinfo);
        long updatedSize = recSize;
        if (oldSize != null) {
            updatedSize += oldSize.longValue();
        }
        pMap.put(pinfo, updatedSize);
        Long oldNum = pMsgMap.get(pinfo);
        long updatedNum = 1L;
        if (oldSize != null) {
            ++updatedNum;
        }
        pMsgMap.put(pinfo, updatedNum);
    }

    protected void VerifyPollingUnorderedKey(String stName, String tpName, int partition, int seq) throws IOException {
        PartitionInfo pinfo = new PartitionInfo(stName, tpName, partition);
        boolean[] bitArray = this.partitionBArrayMap.get(pinfo);
        if (bitArray == null) {
            bitArray = new boolean[this.numExpectedMsgs];
            this.partitionBArrayMap.put(pinfo, bitArray);
        }
        if (bitArray[seq] && !this.allowDuplicateKeys) {
            throw new IOException("Duplicate key for Stream " + stName + " Topic " + tpName + " partition " + partition + ". seq is " + seq);
        }
        bitArray[seq] = true;
    }

    private int NumExpectedMsgs(String topicName) {
        if (Integer.parseInt(topicName.substring(topicIntial.length())) < this.numTopics) {
            return this.numExpectedMsgs;
        }
        return this.numExpectedMsgs / 1000;
    }

    private void VerifyPollingEnd() throws IOException {
        int totalNumTopics = this.numTopics + this.numSlowTopics;
        int totalPartitions = this.numStreams * totalNumTopics * this.numPartitions;
        if (this.keysInOrder) {
            if (this.partitionSeqMap.size() != totalPartitions) {
                throw new IOException("Total entries in hashmap " + this.partitionSeqMap.size() + ", expected " + totalPartitions);
            }
            Set<Map.Entry<PartitionInfo, Integer>> set = this.partitionSeqMap.entrySet();
            for (Map.Entry<PartitionInfo, Integer> entry : set) {
                int lastExpectedSeq = this.NumExpectedMsgs(entry.getKey().topicName()) - 1;
                if (entry.getValue() == lastExpectedSeq) continue;
                throw new IOException(entry.getKey() + ", Last seq received " + entry.getValue() + ", expected " + lastExpectedSeq);
            }
        } else {
            if (this.partitionBArrayMap.size() != totalPartitions) {
                throw new IOException("Total entries in hashmap " + this.partitionBArrayMap.size() + ", expected " + totalPartitions);
            }
            Set<Map.Entry<PartitionInfo, boolean[]>> set = this.partitionBArrayMap.entrySet();
            for (Map.Entry<PartitionInfo, boolean[]> entry : set) {
                boolean[] barray = entry.getValue();
                for (int i = 0; i < this.numExpectedMsgs; ++i) {
                    if (barray[i]) continue;
                    throw new IOException("Message with seq " + i + " missing");
                }
            }
        }
    }

    private class TedThread
    implements Runnable {
        private TED_ACTION action;
        private Listener listener;

        public TedThread(TED_ACTION action, Listener listener2) {
            this.action = action;
            this.listener = listener2;
        }

        @Override
        public void run() {
            try {
                Random randomGenerator = new Random();
                int numSecs = 5 + randomGenerator.nextInt(10);
                try {
                    Thread.sleep(numSecs * 100);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                if (this.action == TED_ACTION.kDeleteStream) {
                    System.out.println("Ted Action DeleteStream");
                    this.listener.DeleteStream();
                } else if (this.action == TED_ACTION.kChangePerm) {
                    System.out.println("Ted Action ChangePerm");
                    this.listener.ChangePerm();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static enum TED_ACTION {
        kNone(0),
        kDeleteStream(1),
        kChangePerm(2);

        private final int val;

        private TED_ACTION(int val) {
            this.val = val;
        }

        public int getValue() {
            return this.val;
        }

        static TED_ACTION parseTedAction(int val) {
            switch (val) {
                case 0: {
                    return kNone;
                }
                case 1: {
                    return kDeleteStream;
                }
                case 2: {
                    return kChangePerm;
                }
            }
            System.out.println("Invalid ted action " + val);
            return kNone;
        }
    }

    protected class PartitionInfo {
        private final String streamName;
        private final String topicName;
        private final int partitionId;

        public PartitionInfo(String streamName, String topicName, int partitionId) {
            this.streamName = streamName;
            this.topicName = topicName;
            this.partitionId = partitionId;
        }

        public boolean equals(Object anObject) {
            PartitionInfo pinfo;
            if (this == anObject) {
                return true;
            }
            return anObject instanceof PartitionInfo && this.streamName.equals((pinfo = (PartitionInfo)anObject).streamName()) && this.topicName.equals(pinfo.topicName()) && this.partitionId == pinfo.partitionId();
        }

        public int hashCode() {
            return this.topicName.hashCode() + this.partitionId;
        }

        public String toString() {
            return "Stream: " + this.streamName + " Topic: " + this.topicName + " Partition: " + this.partitionId;
        }

        public String streamName() {
            return this.streamName;
        }

        public String topicName() {
            return this.topicName;
        }

        public int partitionId() {
            return this.partitionId;
        }
    }

    private final class PerfStats {
        private long startTime = System.currentTimeMillis();
        private long endTime = -1L;
        private long totalBytes = 0L;
        private long totalMsgs = 0L;

        public synchronized void report(long bytes, long numMsgs) {
            this.totalBytes += bytes;
            this.totalMsgs += numMsgs;
        }

        public synchronized void printReport() {
            this.endTime = System.currentTimeMillis();
            System.out.println("Total time (ms): " + (this.endTime - this.startTime));
            System.out.println("Total bytes received: " + this.totalBytes);
            System.out.println("Total messages received: " + this.totalMsgs);
            long bytesInKb = this.totalBytes / 1024L;
            System.out.println("Average nKBs/sec: " + (double)bytesInKb * 1.0 / (double)(this.endTime - this.startTime) * 1000.0);
            System.out.println("Average nMsgs/sec: " + (double)this.totalMsgs * 1.0 / (double)(this.endTime - this.startTime) * 1000.0);
        }
    }

    private static final class RebCb
    implements ConsumerRebalanceListener {
        List<TopicPartition> subscribed;
        boolean[] actuallySubscribed;
        boolean allSubscribed = false;
        boolean allUnsubscribed = true;
        List<Consumer<?, ?>> consumers = new ArrayList();

        RebCb(List<TopicPartition> partitions) {
            this.subscribed = partitions;
            this.actuallySubscribed = new boolean[partitions.size()];
            for (int i = 0; i < this.actuallySubscribed.length; ++i) {
                this.actuallySubscribed[i] = false;
            }
        }

        public void addConsumer(Consumer<?, ?> consumer) {
            this.consumers.add(consumer);
        }

        public void Reallocate(List<TopicPartition> partitions) {
            boolean[] newArr = new boolean[partitions.size()];
            for (int i = 0; i < partitions.size(); ++i) {
                for (int j = 0; j < this.subscribed.size(); ++j) {
                    if (!partitions.get(i).equals((Object)this.subscribed.get(j))) continue;
                    newArr[i] = this.actuallySubscribed[j];
                }
            }
            this.subscribed = partitions;
            this.actuallySubscribed = newArr;
            this.allSubscribed = false;
        }

        public void WaitForRevoke() {
            while (this.AllSubscribed()) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
        }

        public void WaitForAllSubscribed() {
            while (!this.AllSubscribed()) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
        }

        public void WaitForAllUnsubscribed() {
            while (!this.AllUnsubscribed()) {
                try {
                    this.wait();
                }
                catch (Exception exception) {}
            }
        }

        public synchronized boolean AllSubscribed() {
            return this.allSubscribed;
        }

        public synchronized boolean AllUnsubscribed() {
            return this.allUnsubscribed;
        }

        public synchronized void setAllSubscribed() {
            for (int i = 0; i < this.actuallySubscribed.length; ++i) {
                if (this.actuallySubscribed[i]) continue;
                return;
            }
            System.out.println("setting subscribed to true");
            this.allSubscribed = true;
        }

        public synchronized void setAllUnsubscribed() {
            for (int i = 0; i < this.actuallySubscribed.length; ++i) {
                if (!this.actuallySubscribed[i]) continue;
                return;
            }
            System.out.println("setting unsubscribed to true");
            this.allUnsubscribed = true;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            this.Assigned(partitions);
        }

        public synchronized void Assigned(Collection<TopicPartition> partitions) {
            block0: for (TopicPartition partition : partitions) {
                for (int i = 0; i < this.subscribed.size(); ++i) {
                    if (!partition.equals((Object)this.subscribed.get(i))) continue;
                    Assert.assertTrue((!this.actuallySubscribed[i] ? 1 : 0) != 0);
                    this.actuallySubscribed[i] = true;
                    this.allUnsubscribed = false;
                    continue block0;
                }
            }
            this.setAllSubscribed();
            this.notifyAll();
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.Revoked(partitions);
            if (this.allUnsubscribed) {
                for (Consumer<?, ?> consumer : this.consumers) {
                    ConsumerRecords recs = consumer.poll(10L);
                    if (recs.count() == 0) continue;
                    System.out.println("Got messages after all revoked");
                }
            }
        }

        public synchronized void Revoked(Collection<TopicPartition> partitions) {
            block0: for (TopicPartition partition : partitions) {
                for (int i = 0; i < this.subscribed.size(); ++i) {
                    if (!partition.equals((Object)this.subscribed.get(i))) continue;
                    Assert.assertTrue((this.actuallySubscribed[i] ? 1 : 0) != 0);
                    this.actuallySubscribed[i] = false;
                    this.allSubscribed = false;
                    continue block0;
                }
            }
            this.setAllUnsubscribed();
            this.notifyAll();
        }
    }
}

