package com.mapr.streams.listener;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.Admin;
import com.mapr.streams.StreamDescriptor;
import com.mapr.streams.Streams;
import com.mapr.streams.impl.admin.CursorInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.Assert;

/* loaded from: input_file:com/mapr/streams/listener/Listener.class */
public class Listener implements Runnable {
    public static String topicIntial = new String("topic");
    private Configuration conf = new Configuration();
    private Admin madmin;
    private String streamName;
    private int numStreams;
    private int numTopics;
    private int numSlowTopics;
    private int numPartitions;
    private int numExpectedMsgs;
    private int maxPartitionFetchSize;
    private boolean verifyKeys;
    private boolean keysInOrder;
    private boolean allowDuplicateKeys;
    private boolean isTracingEnabled;
    private boolean printStats;
    private TED_ACTION action;
    private boolean topicSubscription;
    private String groupId;
    private Hashtable<PartitionInfo, Integer> partitionSeqMap;
    private Hashtable<PartitionInfo, boolean[]> partitionBArrayMap;
    public boolean status;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/listener/Listener$PartitionInfo.class */
    public class PartitionInfo {
        private final String streamName;
        private final String topicName;
        private final int partitionId;

        public PartitionInfo(String str, String str2, int i) {
            this.streamName = str;
            this.topicName = str2;
            this.partitionId = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof PartitionInfo)) {
                return false;
            }
            PartitionInfo partitionInfo = (PartitionInfo) obj;
            return this.streamName.equals(partitionInfo.streamName()) && this.topicName.equals(partitionInfo.topicName()) && this.partitionId == partitionInfo.partitionId();
        }

        public int hashCode() {
            return this.topicName.hashCode() + this.partitionId;
        }

        public String toString() {
            return "Stream: " + this.streamName + " Topic: " + this.topicName + " Partition: " + this.partitionId;
        }

        public String streamName() {
            return this.streamName;
        }

        public String topicName() {
            return this.topicName;
        }

        public int partitionId() {
            return this.partitionId;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/listener/Listener$PerfStats.class */
    public final class PerfStats {
        private long endTime = -1;
        private long totalBytes = 0;
        private long totalMsgs = 0;
        private long startTime = System.currentTimeMillis();

        public PerfStats() {
        }

        public synchronized void report(long j, long j2) {
            this.totalBytes += j;
            this.totalMsgs += j2;
        }

        public synchronized void printReport() {
            this.endTime = System.currentTimeMillis();
            System.out.println("Total time (ms): " + (this.endTime - this.startTime));
            System.out.println("Total bytes received: " + this.totalBytes);
            System.out.println("Total messages received: " + this.totalMsgs);
            System.out.println("Average nKBs/sec: " + ((((this.totalBytes / 1024) * 1.0d) / (this.endTime - this.startTime)) * 1000.0d));
            System.out.println("Average nMsgs/sec: " + (((this.totalMsgs * 1.0d) / (this.endTime - this.startTime)) * 1000.0d));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/listener/Listener$RebCb.class */
    public static final class RebCb implements ConsumerRebalanceListener {
        List<TopicPartition> subscribed;
        boolean[] actuallySubscribed;
        boolean allSubscribed = false;
        boolean allUnsubscribed = true;
        List<Consumer<?, ?>> consumers = new ArrayList();

        RebCb(List<TopicPartition> list) {
            this.subscribed = list;
            this.actuallySubscribed = new boolean[list.size()];
            for (int i = 0; i < this.actuallySubscribed.length; i++) {
                this.actuallySubscribed[i] = false;
            }
        }

        public void addConsumer(Consumer<?, ?> consumer) {
            this.consumers.add(consumer);
        }

        public void Reallocate(List<TopicPartition> list) {
            boolean[] zArr = new boolean[list.size()];
            for (int i = 0; i < list.size(); i++) {
                for (int i2 = 0; i2 < this.subscribed.size(); i2++) {
                    if (list.get(i).equals(this.subscribed.get(i2))) {
                        zArr[i] = this.actuallySubscribed[i2];
                    }
                }
            }
            this.subscribed = list;
            this.actuallySubscribed = zArr;
            this.allSubscribed = false;
        }

        public void WaitForRevoke() {
            while (AllSubscribed()) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
        }

        public void WaitForAllSubscribed() {
            while (!AllSubscribed()) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
        }

        public void WaitForAllUnsubscribed() {
            while (!AllUnsubscribed()) {
                try {
                    wait();
                } catch (Exception e) {
                }
            }
        }

        public synchronized boolean AllSubscribed() {
            return this.allSubscribed;
        }

        public synchronized boolean AllUnsubscribed() {
            return this.allUnsubscribed;
        }

        public synchronized void setAllSubscribed() {
            for (int i = 0; i < this.actuallySubscribed.length; i++) {
                if (!this.actuallySubscribed[i]) {
                    return;
                }
            }
            System.out.println("setting subscribed to true");
            this.allSubscribed = true;
        }

        public synchronized void setAllUnsubscribed() {
            for (int i = 0; i < this.actuallySubscribed.length; i++) {
                if (this.actuallySubscribed[i]) {
                    return;
                }
            }
            System.out.println("setting unsubscribed to true");
            this.allUnsubscribed = true;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            Assigned(collection);
        }

        public synchronized void Assigned(Collection<TopicPartition> collection) {
            for (TopicPartition topicPartition : collection) {
                int i = 0;
                while (true) {
                    if (i >= this.subscribed.size()) {
                        break;
                    }
                    if (topicPartition.equals(this.subscribed.get(i))) {
                        Assert.assertTrue(!this.actuallySubscribed[i]);
                        this.actuallySubscribed[i] = true;
                        this.allUnsubscribed = false;
                    } else {
                        i++;
                    }
                }
            }
            setAllSubscribed();
            notifyAll();
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            Revoked(collection);
            if (this.allUnsubscribed) {
                Iterator<Consumer<?, ?>> it = this.consumers.iterator();
                while (it.hasNext()) {
                    if (it.next().poll(10L).count() != 0) {
                        System.out.println("Got messages after all revoked");
                    }
                }
            }
        }

        public synchronized void Revoked(Collection<TopicPartition> collection) {
            for (TopicPartition topicPartition : collection) {
                int i = 0;
                while (true) {
                    if (i >= this.subscribed.size()) {
                        break;
                    }
                    if (topicPartition.equals(this.subscribed.get(i))) {
                        Assert.assertTrue(this.actuallySubscribed[i]);
                        this.actuallySubscribed[i] = false;
                        this.allSubscribed = false;
                    } else {
                        i++;
                    }
                }
            }
            setAllUnsubscribed();
            notifyAll();
        }
    }

    /* loaded from: input_file:com/mapr/streams/listener/Listener$TED_ACTION.class */
    public enum TED_ACTION {
        kNone(0),
        kDeleteStream(1),
        kChangePerm(2);

        private final int val;

        TED_ACTION(int i) {
            this.val = i;
        }

        public int getValue() {
            return this.val;
        }

        static TED_ACTION parseTedAction(int i) {
            switch (i) {
                case 0:
                    return kNone;
                case 1:
                    return kDeleteStream;
                case 2:
                    return kChangePerm;
                default:
                    System.out.println("Invalid ted action " + i);
                    return kNone;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/mapr/streams/listener/Listener$TedThread.class */
    public class TedThread implements Runnable {
        private TED_ACTION action;
        private Listener listener;

        public TedThread(TED_ACTION ted_action, Listener listener) {
            this.action = ted_action;
            this.listener = listener;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    Thread.sleep((5 + new Random().nextInt(10)) * 100);
                } catch (Exception e) {
                }
                if (this.action == TED_ACTION.kDeleteStream) {
                    System.out.println("Ted Action DeleteStream");
                    this.listener.DeleteStream();
                } else if (this.action == TED_ACTION.kChangePerm) {
                    System.out.println("Ted Action ChangePerm");
                    this.listener.ChangePerm();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    public static void usage() {
        System.err.println("Listener -path <stream-intialname> [-nstreams <num streams> -ntopics <num topics> -nslowtopics <num topics> -npart <numpartitions per topic> -nmsgs <num messages per topicfeed>]");
        System.exit(1);
    }

    public static void main(String[] strArr) throws Exception {
        String str = null;
        int i = 2;
        int i2 = 2;
        int i3 = 2;
        int i4 = 4;
        int i5 = 1000;
        boolean z = true;
        boolean z2 = true;
        boolean z3 = false;
        boolean z4 = false;
        TED_ACTION ted_action = TED_ACTION.kNone;
        boolean z5 = false;
        String str2 = null;
        int i6 = 65536;
        int i7 = 0;
        while (i7 < strArr.length) {
            if (strArr[i7].equals("-path")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                str = strArr[i7];
            } else if (strArr[i7].equals("-nstreams")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                i = Integer.parseInt(strArr[i7]);
            } else if (strArr[i7].equals("-ntopics")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                i2 = Integer.parseInt(strArr[i7]);
            } else if (strArr[i7].equals("-nslowtopics")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                i3 = Integer.parseInt(strArr[i7]);
            } else if (strArr[i7].equals("-npart")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                i4 = Integer.parseInt(strArr[i7]);
            } else if (strArr[i7].equals("-nmsgs")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                i5 = Integer.parseInt(strArr[i7]);
            } else if (strArr[i7].equals("-verifykeys")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                z = Boolean.parseBoolean(strArr[i7]);
            } else if (strArr[i7].equals("-keysinorder")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                z2 = Boolean.parseBoolean(strArr[i7]);
            } else if (strArr[i7].equals("-debug")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                z4 = Boolean.parseBoolean(strArr[i7]);
            } else if (strArr[i7].equals("-allowduplicates")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                z3 = Boolean.parseBoolean(strArr[i7]);
            } else if (strArr[i7].equals("-tedaction")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                ted_action = TED_ACTION.parseTedAction(Integer.parseInt(strArr[i7]));
                System.out.println("Setting tedaction to " + ted_action);
            } else if (strArr[i7].equals("-topicsubscr")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                z5 = Boolean.parseBoolean(strArr[i7]);
            } else if (strArr[i7].equals("-groupid")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                str2 = strArr[i7];
            } else if (strArr[i7].equals("-maxfetchsize")) {
                i7++;
                if (i7 >= strArr.length) {
                    usage();
                }
                i6 = Integer.parseInt(strArr[i7]);
            } else {
                usage();
            }
            i7++;
        }
        Listener listener = new Listener(str, i, i2, i3, i4, i5, z, z2, z3, z4, true, ted_action, z5, str2, i6);
        Thread thread = new Thread(listener);
        thread.start();
        thread.join();
        if (listener.status) {
            return;
        }
        System.out.println("Listener test failed.");
    }

    public static boolean runLGTest(String str, int i, int i2, int i3, boolean z, String str2) {
        Consumer<?, ?> consumer = null;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        try {
            Admin admin = null;
            try {
                admin = Streams.newAdmin(new Configuration());
            } catch (Exception e) {
                e.printStackTrace();
            }
            for (int i4 = 0; i4 < i; i4++) {
                for (int i5 = 0; i5 < i2; i5++) {
                    String str3 = str + i4 + ":" + topicIntial + i5;
                    arrayList2.add(str3);
                    for (int i6 = 0; i6 < i3; i6++) {
                        arrayList.add(new TopicPartition(str3, i6));
                    }
                }
            }
            Properties properties = new Properties();
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("auto.offset.reset", "earliest");
            properties.put("max.partition.fetch.bytes", 65536);
            if (str2 != null) {
                properties.put("group.id", str2);
            }
            ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
            ByteArrayDeserializer byteArrayDeserializer2 = new ByteArrayDeserializer();
            consumer = new KafkaConsumer<>(properties, byteArrayDeserializer, byteArrayDeserializer2);
            RebCb rebCb = new RebCb(arrayList);
            rebCb.addConsumer(consumer);
            consumer.subscribe(arrayList2, rebCb);
            System.out.println("Creating topics");
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                try {
                    String[] split = ((String) it.next()).split(":");
                    admin.createTopic(split[0], split[1], i3);
                } catch (Exception e2) {
                    System.out.println("Topic create failed");
                }
            }
            System.out.println("Waiting for topic subscriptions");
            rebCb.WaitForAllSubscribed();
            Set assignment = consumer.assignment();
            Set subscription = consumer.subscription();
            System.out.println("All subscribed: L1, partitions " + assignment.size() + ", topics " + subscription.size());
            if (assignment.size() != i * i2 * i3) {
                System.out.println("All subscribed through L1, expected " + (i * i2 * i3) + " but got " + assignment.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            if (subscription.size() != i * i2) {
                System.out.println("All subscribed through L1, expected " + (i * i2) + " but got " + subscription.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, byteArrayDeserializer, byteArrayDeserializer2);
            rebCb.addConsumer(kafkaConsumer);
            kafkaConsumer.subscribe(arrayList2, rebCb);
            rebCb.WaitForRevoke();
            rebCb.WaitForAllSubscribed();
            Set assignment2 = consumer.assignment();
            Set subscription2 = consumer.subscription();
            Set assignment3 = kafkaConsumer.assignment();
            Set subscription3 = kafkaConsumer.subscription();
            System.out.println("Two listeners: L1 (partitions " + assignment2.size() + ", topics " + subscription2.size() + "), L2 (partitiosn " + assignment3.size() + ", topics " + subscription3.size() + ")");
            if (assignment2.size() + assignment3.size() != i * i2 * i3) {
                System.out.println("Expected total subscriptions " + (i * i2 * i3) + " found:" + (assignment2.size() + assignment3.size()));
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            HashSet hashSet = new HashSet();
            hashSet.addAll(subscription2);
            hashSet.addAll(subscription3);
            if (hashSet.size() != i * i2) {
                System.out.println("Expected total subscriptions " + (i * i2) + " found:" + hashSet.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            int i7 = ((i3 / 2) + 1) * i * i2;
            int i8 = (i3 / 2) * i * i2;
            if (assignment2.size() < i8 || assignment2.size() > i7 || assignment3.size() < i8 || assignment3.size() > i7) {
                System.out.println("l1 subscription size " + assignment2.size());
                System.out.println("l2 subscription size " + assignment3.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            int i9 = i3 * 2;
            ArrayList arrayList3 = new ArrayList();
            for (int i10 = 0; i10 < i; i10++) {
                for (int i11 = 0; i11 < i2; i11++) {
                    String str4 = str + i10 + ":" + topicIntial + i11;
                    for (int i12 = 0; i12 < i9; i12++) {
                        arrayList3.add(new TopicPartition(str4, i12));
                    }
                }
            }
            rebCb.Reallocate(arrayList3);
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                try {
                    String[] split2 = ((String) it2.next()).split(":");
                    admin.editTopic(split2[0], split2[1], i9);
                } catch (Exception e3) {
                    System.out.println("Topic edit failed");
                }
            }
            rebCb.WaitForAllSubscribed();
            System.out.println("Checking subscriptions after partition change");
            Set assignment4 = consumer.assignment();
            Set subscription4 = consumer.subscription();
            Set assignment5 = kafkaConsumer.assignment();
            Set subscription5 = kafkaConsumer.subscription();
            System.out.println("Partition change: L1 (partitions " + assignment4.size() + ", topics " + subscription4.size() + "), L2 (partitiosn " + assignment5.size() + ", topics " + subscription5.size() + ")");
            if (assignment4.size() + assignment5.size() != i * i2 * i9) {
                System.out.println("Expected total subscriptions " + (i * i2 * i9) + " found:" + (assignment4.size() + assignment5.size()));
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            hashSet.clear();
            hashSet.addAll(subscription4);
            hashSet.addAll(subscription5);
            if (hashSet.size() != i * i2) {
                System.out.println("Expected total subscriptions " + (i * i2) + " found:" + hashSet.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            int i13 = ((i9 / 2) + 1) * i * i2;
            int i14 = (i9 / 2) * i * i2;
            if (assignment4.size() < i14 || assignment4.size() > i13 || assignment5.size() < i14 || assignment5.size() > i13) {
                System.out.println("l1 subscription size " + assignment4.size());
                System.out.println("l2 subscription size " + assignment5.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            KafkaConsumer kafkaConsumer2 = new KafkaConsumer(properties, byteArrayDeserializer, byteArrayDeserializer2);
            rebCb.addConsumer(kafkaConsumer2);
            kafkaConsumer2.subscribe(arrayList2, rebCb);
            rebCb.WaitForRevoke();
            rebCb.WaitForAllSubscribed();
            Set assignment6 = consumer.assignment();
            Set assignment7 = kafkaConsumer.assignment();
            Set assignment8 = kafkaConsumer2.assignment();
            Set subscription6 = consumer.subscription();
            Set subscription7 = kafkaConsumer.subscription();
            Set subscription8 = kafkaConsumer2.subscription();
            System.out.println("New listener L3 added: L1 (partitions " + assignment6.size() + ", topics " + subscription6.size() + "), L2 (partitiosn " + assignment7.size() + ", topics " + subscription7.size() + "), L3 (partitions " + assignment8.size() + ", topics " + subscription8.size() + ")");
            if (assignment6.size() + assignment7.size() + assignment8.size() != i * i2 * i9) {
                System.out.println("Expected total subscriptions " + (i * i2 * i9) + " found:" + (assignment6.size() + assignment7.size() + assignment8.size()));
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            hashSet.clear();
            hashSet.addAll(subscription6);
            hashSet.addAll(subscription7);
            hashSet.addAll(subscription8);
            if (hashSet.size() != i * i2) {
                System.out.println("Expected total subscriptions " + (i * i2) + " found:" + hashSet.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            int i15 = ((i9 / 3) + 1) * i * i2;
            int i16 = (i9 / 3) * i * i2;
            if (assignment6.size() < i16 || assignment6.size() > i15 || assignment7.size() < i16 || assignment7.size() > i15 || assignment8.size() < i16 || assignment8.size() > i15) {
                System.out.println("l1 subscription size " + assignment6.size());
                System.out.println("l2 subscription size " + assignment7.size());
                System.out.println("l3 subscription size " + assignment8.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            kafkaConsumer.unsubscribe();
            rebCb.WaitForRevoke();
            rebCb.WaitForAllSubscribed();
            Set assignment9 = consumer.assignment();
            Set assignment10 = kafkaConsumer.assignment();
            Set assignment11 = kafkaConsumer2.assignment();
            Set subscription9 = consumer.subscription();
            Set subscription10 = kafkaConsumer.subscription();
            Set subscription11 = kafkaConsumer2.subscription();
            System.out.println("L2 unsubscribed: L1 (partitions " + assignment9.size() + ", topics " + subscription9.size() + "), L2 (partitiosn " + assignment10.size() + ", topics " + subscription10.size() + "), L3 (partitions " + assignment11.size() + ", topics " + subscription11.size() + ")");
            if (assignment9.size() + assignment11.size() != i * i2 * i9) {
                System.out.println("Expected total subscriptions " + (i * i2 * i9) + " found:" + (assignment9.size() + assignment11.size()));
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            hashSet.clear();
            hashSet.addAll(subscription9);
            hashSet.addAll(subscription10);
            hashSet.addAll(subscription11);
            if (hashSet.size() != i * i2) {
                System.out.println("Expected total subscriptions " + (i * i2) + " found:" + hashSet.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            int i17 = ((i9 / 2) + 1) * i * i2;
            int i18 = (i9 / 2) * i * i2;
            if (assignment9.size() < i18 || assignment9.size() > i17 || assignment11.size() < i18 || assignment11.size() > i17) {
                System.out.println("l1 subscription size " + assignment9.size());
                System.out.println("l2 subscription size " + assignment11.size());
                if (consumer != null) {
                    consumer.close();
                }
                return false;
            }
            System.out.println("Delete all the topics and make sure all the partitions get unsubscribed");
            Iterator it3 = arrayList2.iterator();
            while (it3.hasNext()) {
                try {
                    String[] split3 = ((String) it3.next()).split(":");
                    admin.deleteTopic(split3[0], split3[1]);
                } catch (Exception e4) {
                    System.out.println("Topic delete failed");
                }
            }
            rebCb.WaitForAllUnsubscribed();
            System.out.println("Recreating all topics");
            Iterator it4 = arrayList2.iterator();
            while (it4.hasNext()) {
                try {
                    String[] split4 = ((String) it4.next()).split(":");
                    admin.createTopic(split4[0], split4[1], i9);
                } catch (Exception e5) {
                    System.out.println("Topic recreate failed");
                }
            }
            System.out.println("Waiting for all subscriptions to come back");
            rebCb.WaitForAllSubscribed();
            System.out.println("Unsubscribing all");
            consumer.unsubscribe();
            kafkaConsumer2.unsubscribe();
            rebCb.WaitForAllUnsubscribed();
            if (consumer == null) {
                return true;
            }
            consumer.close();
            return true;
        } catch (Throwable th) {
            if (consumer != null) {
                consumer.close();
            }
            throw th;
        }
    }

    public static boolean runTestWithPollOptions(String str, int i, int i2, int i3, int i4, int i5, int i6) throws IOException {
        Listener listener = new Listener(str, i, i2, i3, i4, i5, true, true, false, false, false, TED_ACTION.kNone, false, null, i6);
        listener.runWithException();
        return listener.status;
    }

    public static boolean runTest(String str, int i, int i2, int i3, int i4, int i5) throws IOException {
        Listener listener = new Listener(str, i, i2, i3, i4, i5, true, true, false, false, false, TED_ACTION.kNone, false, null, 65536);
        listener.runWithException();
        return listener.status;
    }

    public static boolean runTest(String str, int i, int i2, int i3, int i4, int i5, String str2) throws IOException {
        Listener listener = new Listener(str, i, i2, i3, i4, i5, true, true, false, false, false, TED_ACTION.kNone, false, str2, 65536);
        listener.runWithException();
        return listener.status;
    }

    public static boolean runTest(String str, int i, int i2, int i3, int i4, int i5, TED_ACTION ted_action) throws IOException {
        Listener listener = new Listener(str, i, i2, i3, i4, i5, true, true, false, false, false, ted_action, false, null, 65536);
        listener.runWithException();
        return listener.status;
    }

    public static boolean runTest(String str, int i, int i2, int i3, int i4, int i5, boolean z, boolean z2, String str2) throws IOException {
        Listener listener = new Listener(str, i, i2, i3, i4, i5, true, z2, false, false, false, TED_ACTION.kNone, z, str2, 65536);
        listener.runWithException();
        return listener.status;
    }

    public Listener(String str, int i, int i2, int i3, int i4, int i5, boolean z, boolean z2, boolean z3, boolean z4, boolean z5, TED_ACTION ted_action, boolean z6, String str2, int i6) {
        this.numStreams = 2;
        this.numTopics = 2;
        this.numSlowTopics = 2;
        this.numPartitions = 4;
        this.numExpectedMsgs = 1000;
        this.maxPartitionFetchSize = 65536;
        this.verifyKeys = true;
        this.keysInOrder = false;
        this.allowDuplicateKeys = false;
        this.isTracingEnabled = false;
        this.printStats = true;
        this.topicSubscription = false;
        this.groupId = null;
        this.streamName = str;
        this.numStreams = i;
        this.numTopics = i2;
        this.numSlowTopics = i3;
        this.numPartitions = i4;
        this.numExpectedMsgs = i5;
        this.verifyKeys = z;
        this.keysInOrder = z2;
        this.allowDuplicateKeys = z3;
        this.isTracingEnabled = z4;
        this.printStats = z5;
        this.action = ted_action;
        this.topicSubscription = z6;
        this.groupId = str2;
        this.maxPartitionFetchSize = i6;
        try {
            this.madmin = Streams.newAdmin(this.conf);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            runWithException();
        } catch (IOException e) {
            System.out.println(e);
        }
    }

    public void runWithException() throws IOException {
        KafkaConsumer kafkaConsumer = null;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Thread thread = null;
        try {
            int i = 0;
            this.status = false;
            if (this.streamName == null || this.streamName.length() == 0) {
                System.err.println("stream name cannot be empty.");
                usage();
            }
            if (this.keysInOrder) {
                this.partitionSeqMap = new Hashtable<>();
            } else {
                this.partitionBArrayMap = new Hashtable<>();
            }
            int i2 = this.numTopics + this.numSlowTopics;
            for (int i3 = 0; i3 < this.numStreams; i3++) {
                for (int i4 = 0; i4 < i2; i4++) {
                    String str = this.streamName + i3 + ":" + topicIntial + i4;
                    arrayList2.add(str);
                    for (int i5 = 0; i5 < this.numPartitions; i5++) {
                        arrayList.add(new TopicPartition(str, i5));
                        if (this.isTracingEnabled) {
                            System.out.println("Creating a subscription for " + str + " feed:" + i5);
                        }
                    }
                }
            }
            Properties properties = new Properties();
            Marlinserver.MarlinConfigDefaults defaultInstance = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.put("max.partition.fetch.bytes", Integer.valueOf(this.maxPartitionFetchSize));
            properties.put(defaultInstance.getMetadataMaxAge(), 100);
            if (this.groupId != null) {
                properties.put("group.id", this.groupId);
            }
            Consumer<?, ?> kafkaConsumer2 = new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
            RebCb rebCb = new RebCb(arrayList);
            rebCb.addConsumer(kafkaConsumer2);
            if (this.topicSubscription) {
                if (this.groupId != null) {
                    kafkaConsumer2.subscribe(arrayList2, rebCb);
                    rebCb.WaitForAllSubscribed();
                } else {
                    kafkaConsumer2.subscribe(arrayList2);
                }
                Set subscription = kafkaConsumer2.subscription();
                System.out.println("Using subscribe() api, got " + subscription.size() + " from subscription()");
                if (subscription.size() != this.numStreams * i2) {
                    throw new IOException("Subscription failed. Expected:" + (this.numStreams * i2) + " found:" + subscription.size());
                }
            } else {
                kafkaConsumer2.assign(arrayList);
                kafkaConsumer2.assign(arrayList);
                Set subscription2 = kafkaConsumer2.subscription();
                System.out.println("Using assign() api, got " + subscription2.size() + " from subscription()");
                if (!subscription2.isEmpty()) {
                    throw new IOException("Subscription failed.  Should not have topic subscription when using assign() api");
                }
            }
            Set<TopicPartition> assignment = kafkaConsumer2.assignment();
            if (assignment.size() != this.numStreams * i2 * this.numPartitions) {
                throw new IOException("Subscription failed. Expected:" + (this.numStreams * i2 * this.numPartitions) + " found:" + assignment.size());
            }
            System.out.println("Subscription successful:" + assignment.size());
            for (TopicPartition topicPartition : assignment) {
                long position = kafkaConsumer2.position(topicPartition);
                if (this.isTracingEnabled) {
                    System.out.println("Subscribed to " + topicPartition.topic() + " partition:" + topicPartition.partition() + " position:" + position + "(" + this.numExpectedMsgs + ")");
                }
                kafkaConsumer2.seekToBeginning(new TopicPartition[]{topicPartition});
            }
            PerfStats perfStats = new PerfStats();
            if (this.action != TED_ACTION.kNone) {
                thread = new Thread(new TedThread(this.action, this));
                thread.start();
            }
            do {
                if (this.isTracingEnabled) {
                    System.out.println("pollsWithMissingMsgs " + i);
                }
                ConsumerRecords<byte[], byte[]> poll = kafkaConsumer2.poll(1000L);
                i = poll.count() == 0 ? i + 1 : 0;
                VerifyAndAddStats(poll, perfStats);
            } while (i < 2);
            if (this.verifyKeys) {
                VerifyPollingEnd();
            }
            if (this.printStats) {
                perfStats.printReport();
            }
            this.status = true;
            System.out.println("committing offsets");
            kafkaConsumer2.commitSync();
            System.out.println("returning from commit offset");
            if (this.groupId != null && i2 < 10) {
                System.out.println("verifying list cursors");
                for (int i6 = 0; i6 < this.numStreams; i6++) {
                    for (int i7 = 0; i7 < i2; i7++) {
                        List<CursorInfo> listCursors = this.madmin.listCursors(this.streamName + i6, this.groupId, topicIntial + i7, -1);
                        if (listCursors.size() != this.numPartitions) {
                            throw new IOException("Expected " + this.numPartitions + " cursorInfo structure returned " + listCursors.size());
                        }
                        for (CursorInfo cursorInfo : listCursors) {
                            long offset = kafkaConsumer2.committed((TopicPartition) arrayList.get((i6 * i2 * this.numPartitions) + (i7 * this.numPartitions) + cursorInfo.feedId())).offset();
                            if (offset != cursorInfo.cursor()) {
                                throw new IOException("read offset from disk:" + cursorInfo.cursor() + " queryied:" + offset);
                            }
                        }
                    }
                }
                System.out.println("verifying list cursors done");
            }
            long[] jArr = new long[assignment.size()];
            int i8 = 0;
            for (TopicPartition topicPartition2 : assignment) {
                jArr[i8] = kafkaConsumer2.committed(topicPartition2).offset();
                long position2 = kafkaConsumer2.position(topicPartition2);
                if (position2 != jArr[i8]) {
                    System.out.println("Postion is :" + position2 + " committed:" + jArr[i8]);
                    throw new IOException("position is not what was expected");
                }
                kafkaConsumer2.seekToBeginning(new TopicPartition[]{topicPartition2});
                if (kafkaConsumer2.position(topicPartition2) != 0) {
                    throw new IOException("seekToBeginning failed");
                }
                i8++;
            }
            TopicPartition[] topicPartitionArr = new TopicPartition[arrayList.size()];
            arrayList.toArray(topicPartitionArr);
            kafkaConsumer2.seekToEnd(topicPartitionArr);
            int i9 = 0;
            Iterator it = assignment.iterator();
            while (it.hasNext()) {
                long position3 = kafkaConsumer2.position((TopicPartition) it.next());
                int i10 = i9;
                i9++;
                if (position3 < jArr[i10]) {
                    System.out.println("Postion after seekToEnd is :" + position3 + " committed:" + jArr[i9]);
                    throw new IOException("position is not what was expected, after seekToEnd");
                }
            }
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                String[] split = ((String) it2.next()).split(":");
                this.madmin.deleteTopic(split[0], split[1]);
            }
            for (int i11 = 5; i11 > 0; i11--) {
                try {
                    Thread.sleep(150L);
                } catch (Exception e) {
                }
                if (kafkaConsumer2.assignment().size() == 0) {
                    break;
                }
            }
            if (kafkaConsumer2.assignment().size() != 0) {
                throw new IOException("Found subscription after topic delete");
            }
            Iterator it3 = arrayList2.iterator();
            while (it3.hasNext()) {
                String[] split2 = ((String) it3.next()).split(":");
                this.madmin.createTopic(split2[0], split2[1], this.numPartitions);
            }
            for (int i12 = 5; i12 > 0; i12--) {
                try {
                    Thread.sleep(150L);
                } catch (Exception e2) {
                }
                if (kafkaConsumer2.assignment().size() == this.numStreams * i2 * this.numPartitions) {
                    break;
                }
            }
            Set<TopicPartition> assignment2 = kafkaConsumer2.assignment();
            if (assignment2.size() != this.numStreams * i2 * this.numPartitions) {
                throw new IOException("Found subscription after topic delete");
            }
            for (TopicPartition topicPartition3 : assignment2) {
                try {
                } catch (KafkaException e3) {
                }
                if ((kafkaConsumer2.committed(topicPartition3) != null ? kafkaConsumer2.committed(topicPartition3).offset() : 0L) != 0) {
                    throw new IOException("Cursor found for a recreated topic");
                    break;
                }
            }
            Iterator it4 = arrayList2.iterator();
            while (it4.hasNext()) {
                String[] split3 = ((String) it4.next()).split(":");
                this.madmin.deleteTopic(split3[0], split3[1]);
            }
            if (this.topicSubscription && this.groupId != null) {
                rebCb.WaitForAllUnsubscribed();
            }
            Iterator it5 = arrayList2.iterator();
            while (it5.hasNext()) {
                String[] split4 = ((String) it5.next()).split(":");
                this.madmin.createTopic(split4[0], split4[1], this.numPartitions / 2);
            }
            for (int i13 = 5; i13 > 0; i13--) {
                try {
                    try {
                        Thread.sleep(150L);
                    } catch (Exception e4) {
                    }
                } catch (Exception e5) {
                }
                if (kafkaConsumer2.assignment().size() == this.numStreams * i2 * (this.numPartitions / 2)) {
                    break;
                }
            }
            Set assignment3 = kafkaConsumer2.assignment();
            if (assignment3.size() != this.numStreams * i2 * (this.numPartitions / 2)) {
                throw new IOException("Found " + assignment3.size() + " subscription after topic delete Expected:" + (this.numStreams * i2 * (this.numPartitions / 2)));
            }
            System.out.println("Unsubscribing");
            if (this.topicSubscription) {
                kafkaConsumer2.unsubscribe();
                if (this.groupId != null) {
                    rebCb.WaitForAllUnsubscribed();
                }
            } else {
                kafkaConsumer2.unsubscribe();
                if (kafkaConsumer2.assignment().size() != 0) {
                    throw new IOException("Unsubscribe failed");
                }
            }
            System.out.println("The test completed successfully");
            if (kafkaConsumer2 != null) {
                kafkaConsumer2.close();
            }
            if (thread != null) {
                try {
                    thread.join();
                } catch (Exception e6) {
                    e6.printStackTrace();
                }
            }
        } catch (Throwable th) {
            if (0 != 0) {
                kafkaConsumer.close();
            }
            if (0 != 0) {
                try {
                    thread.join();
                } catch (Exception e7) {
                    e7.printStackTrace();
                    throw th;
                }
            }
            throw th;
        }
    }

    public void DeleteStream() {
        try {
            this.madmin.deleteStream(this.streamName + 0);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void ChangePerm() {
        try {
            StreamDescriptor newStreamDescriptor = Streams.newStreamDescriptor();
            System.out.println("Change listen perms for " + this.streamName + 0);
            newStreamDescriptor.setConsumePerms("u:0 | g:0");
            this.madmin.editStream(this.streamName + 0, newStreamDescriptor);
            StreamDescriptor newStreamDescriptor2 = Streams.newStreamDescriptor();
            System.out.println("Change admin perms for " + this.streamName + 0);
            newStreamDescriptor2.setAdminPerms("u:0 | g:0");
            this.madmin.editStream(this.streamName + 0, newStreamDescriptor2);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void VerifyAndAddStats(ConsumerRecords<byte[], byte[]> consumerRecords, PerfStats perfStats) throws IOException {
        Iterator it = consumerRecords.iterator();
        long j = 0;
        Hashtable<PartitionInfo, Long> hashtable = this.verifyKeys ? new Hashtable<>() : null;
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            if (this.isTracingEnabled) {
                System.out.println(consumerRecord);
            }
            try {
                byte[] bArr = (byte[]) consumerRecord.key();
                long length = bArr.length + ((byte[]) consumerRecord.value()).length;
                j += length;
                if (this.verifyKeys) {
                    String str = new String(bArr, "UTF-8");
                    String[] split = str.split(":");
                    if (split.length != 4) {
                        throw new IOException("Key " + str + " not of correct format");
                    }
                    int parseInt = Integer.parseInt(split[2]);
                    int parseInt2 = Integer.parseInt(split[3]);
                    if (this.isTracingEnabled) {
                        System.out.println("Key " + str + " ntokens " + split.length);
                    }
                    if (split[0].length() != this.streamName.length() + 1 || !split[0].startsWith(this.streamName) || Integer.parseInt(split[0].substring(this.streamName.length())) >= this.numStreams) {
                        throw new IOException("StreamName in key " + split[0] + " didnot match intial " + this.streamName + " and numstreams " + this.numStreams);
                    }
                    String[] split2 = consumerRecord.topic().split(":");
                    if (split2.length != 2 || !split[1].equals(split2[1])) {
                        throw new IOException("Topic in key " + split[1] + " mismatched. Expected " + split2[1]);
                    }
                    if (parseInt != consumerRecord.partition()) {
                        throw new IOException("Partition in key " + parseInt + " mismatched. Expected " + consumerRecord.partition());
                    }
                    if (this.keysInOrder) {
                        VerifyPollingOrderedKey(split[0], split[1], parseInt, parseInt2);
                    } else {
                        VerifyPollingUnorderedKey(split[0], split[1], parseInt, parseInt2);
                    }
                    AddPerPartitonRecs(hashtable, split[0], split[1], parseInt, length);
                }
            } catch (Exception e) {
                throw new IOException("ConsumerRecord Exception");
            }
        }
        if (this.verifyKeys) {
            for (Long l : hashtable.values()) {
                if (l.longValue() > this.maxPartitionFetchSize) {
                    System.out.println("Total msgbytes fetched " + l + " is greater than " + this.maxPartitionFetchSize);
                    throw new IOException("Total msgbytes fetched " + l + " is greater than " + this.maxPartitionFetchSize);
                }
            }
        }
        perfStats.report(j, consumerRecords.count());
    }

    private void VerifyPollingOrderedKey(String str, String str2, int i, int i2) throws IOException {
        PartitionInfo partitionInfo = new PartitionInfo(str, str2, i);
        Integer num = this.partitionSeqMap.get(partitionInfo);
        int i3 = 0;
        if (num != null) {
            i3 = num.intValue() + 1;
        }
        if (i2 != i3) {
            throw new IOException("Current Seq " + i2 + " for Stream " + str + " Topic " + str2 + " partition " + i + " mismatched. Expected " + i3);
        }
        this.partitionSeqMap.put(partitionInfo, Integer.valueOf(i2));
    }

    private void AddPerPartitonRecs(Hashtable<PartitionInfo, Long> hashtable, String str, String str2, int i, long j) {
        PartitionInfo partitionInfo = new PartitionInfo(str, str2, i);
        Long l = hashtable.get(partitionInfo);
        long j2 = j;
        if (l != null) {
            j2 += l.longValue();
        }
        hashtable.put(partitionInfo, Long.valueOf(j2));
    }

    private void VerifyPollingUnorderedKey(String str, String str2, int i, int i2) throws IOException {
        PartitionInfo partitionInfo = new PartitionInfo(str, str2, i);
        boolean[] zArr = this.partitionBArrayMap.get(partitionInfo);
        if (zArr == null) {
            zArr = new boolean[this.numExpectedMsgs];
            this.partitionBArrayMap.put(partitionInfo, zArr);
        }
        if (zArr[i2] && !this.allowDuplicateKeys) {
            throw new IOException("Duplicate key for Stream " + str + " Topic " + str2 + " partition " + i + ". seq is " + i2);
        }
        zArr[i2] = true;
    }

    private int NumExpectedMsgs(String str) {
        return Integer.parseInt(str.substring(topicIntial.length())) < this.numTopics ? this.numExpectedMsgs : this.numExpectedMsgs / 1000;
    }

    private void VerifyPollingEnd() throws IOException {
        int i = this.numStreams * (this.numTopics + this.numSlowTopics) * this.numPartitions;
        if (this.keysInOrder) {
            if (this.partitionSeqMap.size() != i) {
                throw new IOException("Total entries in hashmap " + this.partitionSeqMap.size() + ", expected " + i);
            }
            for (Map.Entry<PartitionInfo, Integer> entry : this.partitionSeqMap.entrySet()) {
                int NumExpectedMsgs = NumExpectedMsgs(entry.getKey().topicName()) - 1;
                if (entry.getValue().intValue() != NumExpectedMsgs) {
                    throw new IOException(entry.getKey() + ", Last seq received " + entry.getValue() + ", expected " + NumExpectedMsgs);
                }
            }
            return;
        }
        if (this.partitionBArrayMap.size() != i) {
            throw new IOException("Total entries in hashmap " + this.partitionBArrayMap.size() + ", expected " + i);
        }
        Iterator<Map.Entry<PartitionInfo, boolean[]>> it = this.partitionBArrayMap.entrySet().iterator();
        while (it.hasNext()) {
            boolean[] value = it.next().getValue();
            for (int i2 = 0; i2 < this.numExpectedMsgs; i2++) {
                if (!value[i2]) {
                    throw new IOException("Message with seq " + i2 + " missing");
                }
            }
        }
    }
}
