/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.listener;

import java.io.IOException;
import java.lang.invoke.CallSite;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;

public class ListenerPerformance
implements Runnable {
    private String streamName;
    private String streamNameToCheck;
    private int numTopics = 2;
    private int numPartitions = 4;
    private int numExpectedMsgs = 100000;
    private int numBatches = 1;
    private boolean verifyKeys = false;
    private boolean keysInOrder = false;
    private boolean allowDuplicateKeys = false;
    private boolean isTracingEnabled = false;
    private boolean printProgress = false;
    private boolean topicSubscriptions = false;
    private String groupId = null;
    private String clientId = null;
    private Hashtable<PartitionInfo, Integer> partitionSeqMap;
    private Hashtable<PartitionInfo, Long> partitionOffsetMap;
    private Hashtable<PartitionInfo, boolean[]> partitionBArrayMap;
    public boolean status;

    public static void usage() {
        System.err.println("ListenerPerformance -path <stream-full-name>");
        System.err.println("     [ -ntopics <num topics> (default: 2) ]");
        System.err.println("     [ -npart <numpartitions per topic> (default: 1) ]");
        System.err.println("     [ -nmsgs <num messages per topicfeed> (default: 100000) ]");
        System.err.println("     [ -group <consumer group id> (default: null) ]");
        System.err.println("     [ -topicsubscription <true/false> (default: false) ]");
        System.exit(1);
    }

    public static void main(String[] args) throws Exception {
        String streamName = null;
        int numTopics = 2;
        int numPartitions = 1;
        int numExpectedMsgs = 100000;
        int numBatches = 1;
        boolean verifyKeys = true;
        boolean keysInOrder = false;
        boolean allowDuplicateKeys = false;
        boolean isTracingEnabled = false;
        boolean printProgress = true;
        boolean topicSubscriptions = false;
        String groupId = null;
        String clientId = null;
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                streamName = args[i];
                continue;
            }
            if (args[i].equals("-ntopics")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                numTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-npart")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                numPartitions = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-nmsgs")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                numExpectedMsgs = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-verify")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                verifyKeys = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-keysinorder")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                keysInOrder = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-debug")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                isTracingEnabled = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-allowduplicates")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                allowDuplicateKeys = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-nbatches")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                numBatches = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-group")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                groupId = args[i];
                continue;
            }
            if (args[i].equals("-client")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                clientId = args[i];
                continue;
            }
            if (args[i].equals("-topicsubscription")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                topicSubscriptions = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-progress")) {
                if (++i >= args.length) {
                    ListenerPerformance.usage();
                }
                printProgress = Boolean.parseBoolean(args[i]);
                continue;
            }
            ListenerPerformance.usage();
        }
        ListenerPerformance lp = new ListenerPerformance(streamName, numTopics, numPartitions, numExpectedMsgs, numBatches, verifyKeys, keysInOrder, allowDuplicateKeys, isTracingEnabled, printProgress, groupId, clientId, topicSubscriptions);
        Thread lt = new Thread(lp);
        lt.start();
        lt.join();
        if (!lp.status) {
            System.out.println("ListenerPerformance test failed.");
        }
    }

    public ListenerPerformance(String streamName, int numTopics, int numPartitions, int numExpectedMsgs, int numBatches, boolean verifyKeys, boolean keysInOrder, boolean allowDuplicateKeys, boolean isTracingEnabled, boolean printProgress, String groupId, String clientId, boolean topicSubscriptions) {
        this.streamName = streamName;
        this.streamNameToCheck = streamName;
        this.numTopics = numTopics;
        this.numPartitions = numPartitions;
        this.numExpectedMsgs = numExpectedMsgs;
        this.numBatches = numBatches;
        this.verifyKeys = verifyKeys;
        this.keysInOrder = keysInOrder;
        this.allowDuplicateKeys = allowDuplicateKeys;
        this.isTracingEnabled = isTracingEnabled;
        this.printProgress = printProgress;
        this.groupId = groupId;
        this.clientId = clientId;
        this.topicSubscriptions = topicSubscriptions;
    }

    public ListenerPerformance(String streamName, String streamNameToCheck, int numTopics, int numPartitions, int numExpectedMsgs, int numBatches, boolean verifyKeys, boolean keysInOrder, boolean allowDuplicateKeys, boolean isTracingEnabled, boolean printProgress, String groupId, String clientId, boolean topicSubscriptions) {
        this.streamName = streamName;
        this.streamNameToCheck = streamNameToCheck;
        this.numTopics = numTopics;
        this.numPartitions = numPartitions;
        this.numExpectedMsgs = numExpectedMsgs;
        this.numBatches = numBatches;
        this.verifyKeys = verifyKeys;
        this.keysInOrder = keysInOrder;
        this.allowDuplicateKeys = allowDuplicateKeys;
        this.isTracingEnabled = isTracingEnabled;
        this.printProgress = printProgress;
        this.groupId = groupId;
        this.clientId = clientId;
        this.topicSubscriptions = topicSubscriptions;
    }

    public void reportProgress(boolean enabled) {
        this.printProgress = enabled;
    }

    @Override
    public void run() {
        int pollsWithMissingMsgs = 0;
        this.status = false;
        try {
            ConsumerRecords recs;
            if (this.streamName == null || this.streamName.length() == 0) {
                System.err.println("stream name cannot be empty.");
                ListenerPerformance.usage();
            }
            if (this.keysInOrder) {
                this.partitionSeqMap = new Hashtable();
            } else {
                this.partitionBArrayMap = new Hashtable();
            }
            this.partitionOffsetMap = new Hashtable();
            Properties props = new Properties();
            props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.put("auto.offset.reset", "earliest");
            props.put("auto.commit.interval.ms", (Object)1000);
            props.put("fetch.min.bytes", (Object)1);
            props.put("max.partition.fetch.bytes", (Object)65536);
            if (this.groupId != null) {
                props.put("group.id", this.groupId);
            }
            if (this.clientId != null) {
                props.put("client.id", this.clientId);
            }
            ByteArrayDeserializer keyD = new ByteArrayDeserializer();
            ByteArrayDeserializer valueD = new ByteArrayDeserializer();
            KafkaConsumer listener = new KafkaConsumer(props, (Deserializer)keyD, (Deserializer)valueD);
            RebCb cb = new RebCb();
            ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
            ArrayList<CallSite> topicList = new ArrayList<CallSite>();
            for (int i = 0; i < this.numTopics; ++i) {
                String streamTopicName = this.streamName + ":topic" + i;
                topicList.add((CallSite)((Object)streamTopicName));
                for (int j = 0; j < this.numPartitions; ++j) {
                    partitions.add(new TopicPartition(streamTopicName, j));
                    if (!this.isTracingEnabled) continue;
                    System.out.println("Creating a subscription for " + streamTopicName + " feed:" + j);
                }
            }
            if (this.topicSubscriptions) {
                listener.subscribe(topicList, (ConsumerRebalanceListener)cb);
            } else {
                listener.assign(partitions);
            }
            System.out.println("Subscription successful");
            PerfStats stats = new PerfStats();
            int totalNumMsgsExpected = this.numTopics * this.numPartitions * this.numExpectedMsgs * this.numBatches;
            for (int totalNumMsgs = 0; totalNumMsgs < totalNumMsgsExpected; totalNumMsgs += this.VerifyAndAddStats((ConsumerRecords<byte[], byte[]>)recs, stats)) {
                if (this.isTracingEnabled) {
                    System.out.println("pollsWithMissingMsgs " + pollsWithMissingMsgs);
                }
                if ((recs = listener.poll(1000L)).count() == 0) {
                    ++pollsWithMissingMsgs;
                    continue;
                }
                pollsWithMissingMsgs = 0;
            }
            if (this.verifyKeys) {
                this.VerifyPollingEnd();
            }
            stats.printReport();
            Set subscribed = listener.assignment();
            while (subscribed.size() == 0) {
                try {
                    System.out.println("Assignment size " + subscribed.size());
                    Thread.sleep(1000L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                subscribed = listener.assignment();
            }
            if (this.isTracingEnabled) {
                for (TopicPartition p : subscribed) {
                    System.out.println("Subscribed to " + p.topic() + " partition:" + p.partition());
                }
            }
            if (this.verifyKeys) {
                System.out.println("committing offsets");
                listener.commitSync();
                System.out.println("Subscription size " + subscribed.size());
                for (TopicPartition p : subscribed) {
                    long offset = listener.committed(p).offset();
                    String[] topicNameParts = p.topic().split(":");
                    PartitionInfo pinfo = new PartitionInfo(this.streamNameToCheck, topicNameParts[1], p.partition());
                    System.out.println("partition check: " + topicNameParts[0] + " " + topicNameParts[1]);
                    Long mappedoffset = this.partitionOffsetMap.get(pinfo);
                    if (mappedoffset + 1L == offset) continue;
                    System.out.println("Commit offset for  " + p.topic() + " partition " + p.partition() + ":" + offset + " expected:" + mappedoffset);
                    throw new IOException("unexpected commit offset");
                }
            }
            listener.close();
            this.status = true;
        }
        catch (IOException e) {
            System.out.println(e);
        }
    }

    private int VerifyAndAddStats(ConsumerRecords<byte[], byte[]> recs, PerfStats stats) throws IOException {
        Iterator iter = recs.iterator();
        long numBytes = 0L;
        long maxLag = 0L;
        long totalLag = 0L;
        int numMsgs = 0;
        long now = System.currentTimeMillis();
        while (iter.hasNext()) {
            long offset;
            ConsumerRecord rec = (ConsumerRecord)iter.next();
            if (this.isTracingEnabled) {
                System.out.println(rec);
            }
            byte[] key = null;
            byte[] value = null;
            try {
                key = (byte[])rec.key();
                value = (byte[])rec.value();
                ByteBuffer buf = ByteBuffer.wrap(value);
                long producerTime = buf.getLong();
                long lag = now - producerTime;
                if (this.isTracingEnabled) {
                    String keyStr = new String(key, "UTF-8");
                    System.out.println("Producer Time " + producerTime + " lag " + lag + " for key " + keyStr);
                }
                if (lag > maxLag) {
                    maxLag = lag;
                }
                totalLag += lag;
                offset = rec.offset();
            }
            catch (Exception e) {
                throw new IOException("ConsumerRecord Exception");
            }
            ++numMsgs;
            numBytes += (long)key.length;
            numBytes += (long)value.length;
            if (this.verifyKeys) {
                String keyStr = new String(key, "UTF-8");
                String[] tokens = keyStr.split(":");
                if (tokens.length != 4) {
                    throw new IOException("Key " + keyStr + " not of correct format");
                }
                int partition = Integer.parseInt(tokens[2]);
                int seq = Integer.parseInt(tokens[3]);
                if (this.isTracingEnabled) {
                    System.out.println("Key " + keyStr + " ntokens " + tokens.length + " bytes " + (key.length + value.length) + " offset " + offset);
                }
                if (!tokens[0].equals(this.streamNameToCheck)) {
                    throw new IOException("streamName in key " + tokens[0] + " not same as " + this.streamName);
                }
                String recTopic = rec.topic();
                String[] recTopicTokens = recTopic.split(":");
                if (recTopicTokens.length != 2 || !tokens[1].equals(recTopicTokens[1])) {
                    throw new IOException("topic in key " + tokens[1] + " mismatched. Expected " + recTopicTokens[1]);
                }
                if (partition != rec.partition()) {
                    throw new IOException("partition in key " + partition + " mismatched. Expected " + rec.partition());
                }
                if (this.keysInOrder) {
                    this.VerifyPollingOrderedKey(tokens[0], tokens[1], partition, seq, offset);
                } else {
                    this.VerifyPollingUnorderedKey(tokens[0], tokens[1], partition, seq, offset);
                }
            }
            if (!this.isTracingEnabled) continue;
            System.out.println("Value " + new String(value, "UTF-8"));
        }
        stats.report(numBytes, recs.count(), maxLag, totalLag, now);
        return numMsgs;
    }

    private void VerifyPollingOrderedKey(String stName, String tpName, int partition, int seq, long offset) throws IOException {
        PartitionInfo pinfo = new PartitionInfo(stName, tpName, partition);
        Integer mappedSeq = this.partitionSeqMap.get(pinfo);
        int expSeq = 0;
        if (mappedSeq != null) {
            expSeq = mappedSeq + 1;
        }
        if (seq != expSeq) {
            throw new IOException("Current Seq " + seq + " for Stream " + stName + " Topic " + tpName + " partition " + partition + " mismatched. Expected " + expSeq);
        }
        this.partitionSeqMap.put(pinfo, seq);
        Long mappedoffset = this.partitionOffsetMap.get(pinfo);
        if (mappedoffset != null && mappedoffset > offset) {
            throw new IOException("Got out of order offsets");
        }
        this.partitionOffsetMap.put(pinfo, offset);
    }

    private void VerifyPollingUnorderedKey(String stName, String tpName, int partition, int seq, long offset) throws IOException {
        PartitionInfo pinfo = new PartitionInfo(stName, tpName, partition);
        boolean[] bitArray = this.partitionBArrayMap.get(pinfo);
        if (bitArray == null) {
            bitArray = new boolean[this.numExpectedMsgs * this.numBatches];
            this.partitionBArrayMap.put(pinfo, bitArray);
        }
        if (bitArray[seq] && !this.allowDuplicateKeys) {
            throw new IOException("Duplicate key for Stream " + stName + " Topic " + tpName + " partition " + partition + ". seq is " + seq);
        }
        bitArray[seq] = true;
        Long mappedoffset = this.partitionOffsetMap.get(pinfo);
        if (mappedoffset != null && mappedoffset > offset) {
            throw new IOException("Got out of order offsets");
        }
        this.partitionOffsetMap.put(pinfo, offset);
    }

    private void VerifyPollingEnd() throws IOException {
        int totalPartitions = this.numTopics * this.numPartitions;
        if (this.keysInOrder) {
            int lastExpectedSeq = this.numExpectedMsgs * this.numBatches - 1;
            if (this.partitionSeqMap.size() != totalPartitions) {
                throw new IOException("Total entries in hashmap " + this.partitionSeqMap.size() + ", expected " + totalPartitions);
            }
            Set<Map.Entry<PartitionInfo, Integer>> set = this.partitionSeqMap.entrySet();
            for (Map.Entry<PartitionInfo, Integer> entry : set) {
                if (entry.getValue() == lastExpectedSeq) continue;
                throw new IOException(entry.getKey() + ", Last seq received " + entry.getValue() + ", expected " + lastExpectedSeq);
            }
        } else {
            if (this.partitionBArrayMap.size() != totalPartitions) {
                throw new IOException("Total entries in hashmap " + this.partitionBArrayMap.size() + ", expected " + totalPartitions);
            }
            Set<Map.Entry<PartitionInfo, boolean[]>> set = this.partitionBArrayMap.entrySet();
            for (Map.Entry<PartitionInfo, boolean[]> entry : set) {
                boolean[] barray = entry.getValue();
                for (int i = 0; i < this.numExpectedMsgs * this.numBatches; ++i) {
                    if (barray[i]) continue;
                    throw new IOException("Message with seq " + i + " missing");
                }
            }
        }
    }

    private class PartitionInfo {
        private final String streamName;
        private final String topicName;
        private final int partitionId;

        public PartitionInfo(String streamName, String topicName, int partitionId) {
            this.streamName = streamName;
            this.topicName = topicName;
            this.partitionId = partitionId;
        }

        public boolean equals(Object anObject) {
            PartitionInfo pinfo;
            if (this == anObject) {
                return true;
            }
            return anObject instanceof PartitionInfo && this.streamName.equals((pinfo = (PartitionInfo)anObject).streamName()) && this.topicName.equals(pinfo.topicName()) && this.partitionId == pinfo.partitionId();
        }

        public int hashCode() {
            return this.topicName.hashCode() + this.partitionId;
        }

        public String toString() {
            return "Stream: " + this.streamName + " Topic: " + this.topicName + " Partition: " + this.partitionId;
        }

        public String streamName() {
            return this.streamName;
        }

        public String topicName() {
            return this.topicName;
        }

        public int partitionId() {
            return this.partitionId;
        }
    }

    private final class PerfStats {
        private long startTime;
        private long endTime = -1L;
        private long totalBytes = 0L;
        private long totalMsgs = 0L;
        private long maxLag = 0L;
        private long totalLag = 0L;
        private long lastProgressTime = this.startTime = System.currentTimeMillis();
        private long lastProgressMsgs = 0L;
        private int nsecs = 0;

        public synchronized void report(long bytes, long numMsgs, long maxLag, long totalLag, long now) {
            this.totalLag += totalLag;
            this.totalBytes += bytes;
            this.totalMsgs += numMsgs;
            if (maxLag > this.maxLag) {
                this.maxLag = maxLag;
            }
            if (ListenerPerformance.this.printProgress && now - this.lastProgressTime >= 1000L) {
                SimpleDateFormat df = new SimpleDateFormat("dd/MM/yy HH:mm:ss");
                Date date = new Date();
                System.out.printf("%s %4d secs %9d msgs %8d msgs/s maxlag %d %n", df.format(date), this.nsecs, this.totalMsgs, this.totalMsgs - this.lastProgressMsgs, this.maxLag);
                this.lastProgressMsgs = this.totalMsgs;
                this.lastProgressTime = now;
                ++this.nsecs;
            }
        }

        public synchronized void printReport() {
            this.endTime = System.currentTimeMillis();
            System.out.println("Total time (ms): " + (this.endTime - this.startTime));
            System.out.println("Total bytes received: " + this.totalBytes);
            System.out.println("Total messages received: " + this.totalMsgs);
            long bytesInKb = this.totalBytes / 1024L;
            System.out.println("Average nKBs/sec: " + (double)bytesInKb * 1.0 / (double)(this.endTime - this.startTime) * 1000.0);
            System.out.println("Average nMsgs/sec: " + (double)this.totalMsgs * 1.0 / (double)(this.endTime - this.startTime) * 1000.0);
            System.out.println("Average lag in ms: " + this.totalLag / this.totalMsgs);
            System.out.println("Maximum lag in ms: " + this.maxLag);
        }
    }

    private static final class RebCb
    implements ConsumerRebalanceListener {
        RebCb() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        }
    }
}

