/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.producer;

import com.mapr.fs.proto.Marlinserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerPerformance {
    public static String streamName;
    public static String[] topicNames;
    public static int numTopics;
    public static int numSlowTopics;
    public static int numPartitions;
    public static int numMsgsPerPartition;
    public static int numBatches;
    public static long batchSleepMs;
    public static boolean multipleFlushers;
    public static boolean printProgress;
    public static boolean needVerify;
    public static boolean roundRobin;
    public static boolean hashKey;
    public static int totalNumMsgs;
    public static KafkaProducer producer;
    public static int msgValueLength;
    public static long producerPoolSz;
    public static int slowToNormalTopicRatio;
    public static boolean checkLag;
    public static boolean ignoreErr;
    public static long metadataRefreshMs;
    private static int numMsgsWithTS;
    private static byte[] inlineValue;
    private static byte[] inlineValueDefault;

    public static void usage() {
        System.err.println("ProducerPerformance -path <stream-full-name>");
        System.err.println("     [ -ntopics <num topics> (default: 2) ]");
        System.err.println("     [ -npart <numpartitions per topic> (default: 1) ]");
        System.err.println("     [ -nmsgs <num messages per topicfeed> (default: 100000) ]");
        System.err.println("     [ -msgsz <msg value size> (default: 200) ]");
        System.err.println("     [ -rr <round robin true/false> (default: false) ]");
        System.err.println("     [ -hashkey <true/false> (default: false) ]");
        System.exit(1);
    }

    public static void main(String[] args) throws IOException {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                streamName = args[i];
                continue;
            }
            if (args[i].equals("-nmsgs")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                numMsgsPerPartition = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-ntopics")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                numTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-npart")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                numPartitions = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-multiflush")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                multipleFlushers = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-nbatches")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                numBatches = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-batchsleepms")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                batchSleepMs = Long.parseLong(args[i]);
                continue;
            }
            if (args[i].equals("-progress")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                printProgress = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-msgsz")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                msgValueLength = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-poolsz")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                producerPoolSz = Long.parseLong(args[i]);
                continue;
            }
            if (args[i].equals("-nslowtopics")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                numSlowTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-verify")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                needVerify = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-rr")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                roundRobin = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-hashkey")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                hashKey = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-checklag")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                checkLag = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-ignoreerror")) {
                if (++i >= args.length) {
                    ProducerPerformance.usage();
                }
                ignoreErr = Boolean.parseBoolean(args[i]);
                continue;
            }
            ProducerPerformance.usage();
        }
        ProducerPerformance.runTest();
    }

    public static boolean runStressTest(String stream, int nmsgs, int ntopics, int npartitions, int nbatches, long sleepms, boolean verify, boolean mflushers, boolean progress, boolean roundrobin, boolean hashkey) throws IOException {
        streamName = stream;
        numMsgsPerPartition = nmsgs;
        numTopics = ntopics;
        numSlowTopics = 0;
        needVerify = verify;
        numPartitions = npartitions;
        multipleFlushers = mflushers;
        numBatches = nbatches;
        batchSleepMs = sleepms;
        msgValueLength = 200;
        printProgress = progress;
        roundRobin = roundrobin;
        hashKey = hashkey;
        return ProducerPerformance.runTest();
    }

    public static boolean runBasicTest(String stream, int nmsgs, int ntopics, int slowTopics, int npartitions, int nbatches, long sleepms, boolean mflushers, boolean progress, int msgsz, long poolsz, long metadataMillis) throws IOException {
        streamName = stream;
        numMsgsPerPartition = nmsgs;
        numTopics = ntopics;
        numSlowTopics = slowTopics;
        needVerify = true;
        numPartitions = npartitions;
        multipleFlushers = mflushers;
        numBatches = nbatches;
        batchSleepMs = sleepms;
        printProgress = progress;
        msgValueLength = msgsz;
        producerPoolSz = poolsz;
        metadataRefreshMs = metadataMillis;
        return ProducerPerformance.runTest();
    }

    public static boolean runTest() {
        if (streamName == null || streamName.length() == 0) {
            System.err.println("stream name cannot be empty.");
            ProducerPerformance.usage();
        }
        if (numPartitions <= 0) {
            System.err.println("num partitions cannot be negative or zero.");
            ProducerPerformance.usage();
        }
        if (numTopics <= 0) {
            System.err.println("num topics cannot be negative or zero.");
        }
        totalNumMsgs = numMsgsPerPartition * numPartitions * numTopics * numBatches;
        totalNumMsgs += numMsgsPerPartition / slowToNormalTopicRatio * numPartitions * numSlowTopics * numBatches;
        topicNames = new String[numTopics + numSlowTopics];
        boolean index = false;
        for (int i = 0; i < numTopics + numSlowTopics; ++i) {
            String topicName;
            ProducerPerformance.topicNames[i] = topicName = streamName + ":topic" + i;
        }
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), new Boolean(multipleFlushers).toString());
        props.put(cdef.getBufferTime(), (Object)3000);
        props.put(cdef.getMetadataMaxAge(), (Object)metadataRefreshMs);
        props.put(cdef.getBufferMemory(), (Object)producerPoolSz);
        long sleepTime = 0L;
        byte[] value = null;
        byte[] key = null;
        int keySz = 0;
        if (!needVerify) {
            key = new byte[30];
            keySz = key.length;
        }
        producer = new KafkaProducer(props);
        PerfStats stats = new PerfStats(totalNumMsgs);
        for (int batchIdx = 0; batchIdx < numBatches; ++batchIdx) {
            int msgIdx = batchIdx * numMsgsPerPartition;
            for (int i = 0; i < numMsgsPerPartition; ++i) {
                for (int topicIdx = 0; topicIdx < numTopics + numSlowTopics; ++topicIdx) {
                    if (topicIdx >= numTopics && i % slowToNormalTopicRatio != 0) continue;
                    for (int partIdx = 0; partIdx < numPartitions; ++partIdx) {
                        ProducerRecord record;
                        long curTime = -1L;
                        if (i % numMsgsWithTS == 0) {
                            curTime = System.currentTimeMillis();
                        }
                        if (needVerify || hashKey) {
                            int keyIdx = msgIdx + i;
                            key = (topicNames[topicIdx] + ":" + partIdx + ":" + keyIdx).getBytes();
                            keySz = key.length;
                            if (needVerify) {
                                int bufLen = msgValueLength;
                                if (bufLen < keySz + 10) {
                                    bufLen = keySz + 10;
                                }
                                value = new byte[bufLen];
                                ByteBuffer valueBuffer = ByteBuffer.wrap(value);
                                if (curTime == -1L) {
                                    curTime = System.currentTimeMillis();
                                }
                                valueBuffer.putLong(curTime);
                                valueBuffer.put(key);
                                while (valueBuffer.position() <= bufLen - 2) {
                                    valueBuffer.putChar('a');
                                }
                            } else {
                                value = ProducerPerformance.GetValueWithLag(curTime);
                            }
                        } else {
                            value = ProducerPerformance.GetValueWithLag(curTime);
                        }
                        long sendBytes = keySz + msgValueLength;
                        PerfCallback cb = new PerfCallback(curTime, sendBytes, stats);
                        if (roundRobin) {
                            assert (!needVerify);
                            record = new ProducerRecord(topicNames[topicIdx], null, (Object)value);
                        } else if (hashKey) {
                            assert (!needVerify);
                            record = new ProducerRecord(topicNames[topicIdx], (Object)key, (Object)value);
                        } else {
                            record = new ProducerRecord(topicNames[topicIdx], Integer.valueOf(partIdx), (Object)key, (Object)value);
                        }
                        producer.send(record, (Callback)cb);
                    }
                }
            }
            if (batchIdx >= numBatches - 1) continue;
            long sleepStart = System.currentTimeMillis();
            try {
                Thread.sleep(batchSleepMs);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            sleepTime += System.currentTimeMillis() - sleepStart;
        }
        producer.flush();
        producer.close();
        boolean verify = stats.checkAndVerify();
        stats.printReport(numTopics, numPartitions, sleepTime);
        return verify;
    }

    private static byte[] GetValueWithLag(long curTime) {
        if (checkLag && curTime != -1L) {
            inlineValue = new byte[msgValueLength];
            ByteBuffer valueBuffer = ByteBuffer.wrap(inlineValue);
            valueBuffer.putLong(curTime);
        } else {
            inlineValue = inlineValueDefault;
        }
        return inlineValue;
    }

    static {
        numTopics = 2;
        numSlowTopics = 0;
        numPartitions = 1;
        numMsgsPerPartition = 100000;
        numBatches = 1;
        batchSleepMs = 10000L;
        multipleFlushers = true;
        printProgress = true;
        needVerify = true;
        roundRobin = false;
        hashKey = false;
        msgValueLength = 200;
        producerPoolSz = 0x2000000L;
        slowToNormalTopicRatio = 1000;
        checkLag = true;
        ignoreErr = false;
        metadataRefreshMs = 300000L;
        numMsgsWithTS = 1000;
        inlineValue = null;
        inlineValueDefault = new byte[msgValueLength];
    }

    private static final class PerfCallback
    implements Callback {
        private final long start;
        private final long bytes;
        private static boolean printStack = true;
        private PerfStats stats;

        public PerfCallback(long start, long bytes, PerfStats stats) {
            this.start = start;
            this.bytes = bytes;
            this.stats = stats;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long now = System.currentTimeMillis();
            int latency = (int)(now - this.start);
            this.stats.report(now, latency, this.bytes);
            if (exception != null) {
                if (printStack) {
                    exception.printStackTrace();
                    printStack = false;
                }
                if (!ignoreErr) {
                    System.exit(1);
                }
            } else {
                printStack = true;
            }
        }
    }

    private static final class PerfStats {
        private long startTime;
        private long endTime;
        private long minLatency;
        private long maxLatency;
        private long totalLatency;
        private long totalBytes;
        private long msgCount;
        private long totalMsgsToExpect;
        private long lastProgressTime;
        private long lastProgressMsgs;
        private int nsecs;

        public PerfStats(int totalMsgs) {
            this.totalMsgsToExpect = totalMsgs;
            this.minLatency = 999999L;
            this.maxLatency = 0L;
            this.totalBytes = 0L;
            this.msgCount = 0L;
            this.totalLatency = 0L;
            this.endTime = -1L;
            this.lastProgressTime = this.startTime = System.currentTimeMillis();
            this.lastProgressMsgs = 0L;
            this.nsecs = 0;
        }

        public synchronized boolean checkAndVerify() {
            if (this.totalMsgsToExpect != this.msgCount) {
                System.out.println("***** Verification failed! " + this.totalMsgsToExpect + " != " + this.msgCount + " *****");
                return false;
            }
            return true;
        }

        public synchronized void report(long now, long latency, long bytes) {
            this.minLatency = Math.min(this.minLatency, latency);
            this.maxLatency = Math.max(this.maxLatency, latency);
            this.totalBytes += bytes;
            ++this.msgCount;
            this.totalLatency += latency;
            if (printProgress && now - this.lastProgressTime >= 1000L) {
                SimpleDateFormat df = new SimpleDateFormat("dd/MM/yy HH:mm:ss");
                Date date = new Date();
                System.out.printf("%s %4d secs %9d msgs %8d msgs/s %n", df.format(date), this.nsecs, this.msgCount, this.msgCount - this.lastProgressMsgs);
                this.lastProgressMsgs = this.msgCount;
                this.lastProgressTime = now;
                ++this.nsecs;
            }
            if (this.msgCount == this.totalMsgsToExpect) {
                this.endTime = System.currentTimeMillis();
            }
        }

        public synchronized void printReport(int numTopics, int numPartitions, long sleepTime) {
            System.out.println("***** Producer Report Start *****");
            System.out.println("nTopics/nPartitions: " + numTopics + "/" + numPartitions);
            System.out.println("Expected nMsgs: " + this.totalMsgsToExpect);
            System.out.println("Callback nMsgs: " + this.msgCount);
            if (this.endTime == -1L) {
                this.endTime = System.currentTimeMillis();
            }
            long elapsedTime = this.endTime - this.startTime;
            long runTime = this.endTime - this.startTime - sleepTime;
            System.out.println("Total time (ms): " + elapsedTime);
            System.out.println("Total sleep time (ms): " + sleepTime);
            System.out.println("Total run time (ms): " + runTime);
            System.out.println("Total bytes sent: " + this.totalBytes);
            System.out.println("Min/Max latency (ms): " + this.minLatency + "/" + this.maxLatency);
            System.out.println("Average latency (ms): " + this.totalLatency / this.msgCount);
            System.out.println("Average nMsgs/sec: " + (double)this.msgCount * 1.0 / (double)runTime * 1000.0);
            System.out.println("Average nKBs/sec: " + (double)(this.totalBytes / runTime) * 1000.0 / 1024.0);
            System.out.println("***** Producer Report End *****");
        }
    }
}

