/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.producer;

import com.mapr.fs.proto.Marlinserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class MarlinProducerMultiThreadPerformance {
    public static String streamName;
    public static int numTopics;
    public static int numPartitions;
    public static int numMsgsPerPartition;
    public static int numBatches;
    public static long batchSleepMs;
    public static boolean multipleFlushers;
    public static boolean printProgress;
    public static int totalNumMsgs;
    public static KafkaProducer producer;
    public static final int MSG_VALUE_LENGTH = 200;

    public static void usage() {
        System.err.println("MarlinProducerMultiThreadPerformance -path <stream-full-name> [-ntopics <num topics>");
        System.err.println(" -npart <numpartitions per topic> -nmsgs <num messages per topicfeed> -multiflush <enable multiple flushers>");
        System.err.println(" -nbatches <number of batches> -batchsleepms <milliseconds to sleep between batches> ]");
        System.exit(1);
    }

    public static void main(String[] args) throws IOException {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                streamName = args[i];
                continue;
            }
            if (args[i].equals("-nmsgs")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                numMsgsPerPartition = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-ntopics")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                numTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-npart")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                numPartitions = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-multiflush")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                multipleFlushers = Boolean.parseBoolean(args[i]);
                continue;
            }
            if (args[i].equals("-nbatches")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                numBatches = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-batchsleepms")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                batchSleepMs = Long.parseLong(args[i]);
                continue;
            }
            if (args[i].equals("-progress")) {
                if (++i >= args.length) {
                    MarlinProducerMultiThreadPerformance.usage();
                }
                printProgress = Boolean.parseBoolean(args[i]);
                continue;
            }
            MarlinProducerMultiThreadPerformance.usage();
        }
        MarlinProducerMultiThreadPerformance.runTest();
    }

    public static boolean runStressTest(String stream, int nmsgs, int ntopics, int npartitions, int nbatches, long sleepms, boolean mflushers, boolean progress) throws IOException {
        streamName = stream;
        numMsgsPerPartition = nmsgs;
        numTopics = ntopics;
        numPartitions = npartitions;
        multipleFlushers = mflushers;
        numBatches = nbatches;
        batchSleepMs = sleepms;
        printProgress = progress;
        return MarlinProducerMultiThreadPerformance.runTest();
    }

    public static boolean runTest() {
        int i;
        if (streamName == null || streamName.length() == 0) {
            System.err.println("stream name cannot be empty.");
            MarlinProducerMultiThreadPerformance.usage();
        }
        if (numPartitions <= 0) {
            System.err.println("num partitions cannot be negative or zero.");
            MarlinProducerMultiThreadPerformance.usage();
        }
        if (numTopics <= 0) {
            System.err.println("num topics cannot be negative or zero.");
        }
        totalNumMsgs = numMsgsPerPartition * numPartitions * numTopics * numBatches;
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)multipleFlushers);
        props.put(cdef.getBufferTime(), (Object)3000);
        props.put(cdef.getMetadataMaxAge(), (Object)300000);
        long sleepTime = 0L;
        producer = new KafkaProducer(props);
        PerfStats stats = new PerfStats(totalNumMsgs);
        Thread[] partitionSenders = new Thread[numTopics * numPartitions];
        for (i = 0; i < numTopics; ++i) {
            String topicName = streamName + ":topic" + i;
            for (int j = 0; j < numPartitions; ++j) {
                PartitionSender currentPartSender = new PartitionSender(producer, stats, topicName, j, numBatches, numMsgsPerPartition, batchSleepMs);
                partitionSenders[i * MarlinProducerMultiThreadPerformance.numPartitions + j] = new Thread(currentPartSender);
                partitionSenders[i * numPartitions + j].start();
            }
        }
        try {
            for (i = 0; i < partitionSenders.length; ++i) {
                partitionSenders[i].join();
            }
        }
        catch (InterruptedException e) {
            System.out.println("***** Joining partitionSenders failed *****");
        }
        producer.close();
        boolean verify = stats.checkAndVerify();
        stats.printReport(numTopics, numPartitions, sleepTime);
        return verify;
    }

    static {
        numTopics = 2;
        numPartitions = 4;
        numMsgsPerPartition = 100000;
        numBatches = 1;
        batchSleepMs = 10000L;
        multipleFlushers = true;
        printProgress = true;
    }

    private static final class PerfCallback
    implements Callback {
        private final long start;
        private final long bytes;
        private PerfStats stats;

        public PerfCallback(long start, long bytes, PerfStats stats) {
            this.start = start;
            this.bytes = bytes;
            this.stats = stats;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long now = System.currentTimeMillis();
            int latency = (int)(now - this.start);
            this.stats.report(now, latency, this.bytes);
            if (exception != null) {
                exception.printStackTrace();
                System.exit(1);
            }
        }
    }

    private static final class PerfStats {
        private long startTime;
        private long endTime;
        private long minLatency;
        private long maxLatency;
        private long totalLatency;
        private long totalBytes;
        private long msgCount;
        private long totalMsgsToExpect;
        private long lastProgressTime;
        private long lastProgressMsgs;
        private int nsecs;

        public PerfStats(int totalMsgs) {
            this.totalMsgsToExpect = totalMsgs;
            this.minLatency = 999999L;
            this.maxLatency = 0L;
            this.totalBytes = 0L;
            this.msgCount = 0L;
            this.totalLatency = 0L;
            this.endTime = -1L;
            this.lastProgressTime = this.startTime = System.currentTimeMillis();
            this.lastProgressMsgs = 0L;
            this.nsecs = 0;
        }

        public synchronized boolean checkAndVerify() {
            if (this.totalMsgsToExpect != this.msgCount) {
                System.out.println("***** Verification failed! " + this.totalMsgsToExpect + " != " + this.msgCount + " *****");
                return false;
            }
            return true;
        }

        public synchronized void report(long now, long latency, long bytes) {
            this.minLatency = Math.min(this.minLatency, latency);
            this.maxLatency = Math.max(this.maxLatency, latency);
            this.totalBytes += bytes;
            ++this.msgCount;
            this.totalLatency += latency;
            if (printProgress && now - this.lastProgressTime >= 1000L) {
                SimpleDateFormat df = new SimpleDateFormat("dd/MM/yy HH:mm:ss");
                Date date = new Date();
                System.out.printf("%s %4d secs %9d msgs %8d msgs/s %n", df.format(date), this.nsecs, this.msgCount, this.msgCount - this.lastProgressMsgs);
                this.lastProgressMsgs = this.msgCount;
                this.lastProgressTime = now;
                ++this.nsecs;
            }
            if (this.msgCount == this.totalMsgsToExpect) {
                this.endTime = System.currentTimeMillis();
            }
        }

        public synchronized void printReport(int numTopics, int numPartitions, long sleepTime) {
            System.out.println("***** Producer Report Start *****");
            System.out.println("nTopics/nPartitions: " + numTopics + "/" + numPartitions);
            System.out.println("Expected nMsgs: " + this.totalMsgsToExpect);
            System.out.println("Callback nMsgs: " + this.msgCount);
            if (this.endTime == -1L) {
                this.endTime = System.currentTimeMillis();
            }
            long elapsedTime = this.endTime - this.startTime;
            long runTime = this.endTime - this.startTime - sleepTime;
            System.out.println("Total time (ms): " + elapsedTime);
            System.out.println("Total sleep time (ms): " + sleepTime);
            System.out.println("Total run time (ms): " + runTime);
            System.out.println("Total bytes sent: " + this.totalBytes);
            System.out.println("Min/Max latency (ms): " + this.minLatency + "/" + this.maxLatency);
            System.out.println("Average latency (ms): " + this.totalLatency / this.msgCount);
            System.out.println("Average nMsgs/sec: " + (double)this.msgCount * 1.0 / (double)runTime * 1000.0);
            System.out.println("Average nKBs/sec: " + (double)(this.totalBytes / runTime) * 1000.0 / 1024.0);
            System.out.println("***** Producer Report End *****");
        }
    }

    private static final class PartitionSender
    implements Runnable {
        private String topicName;
        private int partitionId;
        private String keyPrefix;
        private KafkaProducer producer;
        private PerfStats stats;
        private int numBatches;
        private int numMsgsPerBatch;
        private long sleepTime;

        public PartitionSender(KafkaProducer prod, PerfStats ss, String tn, int pid, int nb, int nmspb, long st) {
            this.topicName = tn;
            this.partitionId = pid;
            this.producer = prod;
            this.stats = ss;
            this.numBatches = nb;
            this.numMsgsPerBatch = nmspb;
            this.sleepTime = st;
            this.keyPrefix = this.topicName + ":" + this.partitionId;
        }

        @Override
        public void run() {
            for (int batchIdx = 0; batchIdx < this.numBatches; ++batchIdx) {
                int msgIdx = batchIdx * this.numMsgsPerBatch;
                for (int i = 0; i < numMsgsPerPartition; ++i) {
                    int keyIdx = msgIdx + i;
                    byte[] key = (this.keyPrefix + ":" + keyIdx).getBytes();
                    int keySz = key.length;
                    byte[] value = new byte[200];
                    ByteBuffer valueBuffer = ByteBuffer.wrap(value);
                    long curTime = System.currentTimeMillis();
                    valueBuffer.putLong(curTime);
                    valueBuffer.put(key);
                    while (valueBuffer.position() <= 198) {
                        valueBuffer.putChar('a');
                    }
                    long sendStart = System.currentTimeMillis();
                    long sendBytes = keySz + 200;
                    PerfCallback cb = new PerfCallback(sendStart, sendBytes, this.stats);
                    ProducerRecord record = new ProducerRecord(this.topicName, Integer.valueOf(this.partitionId), (Object)key, (Object)value);
                    this.producer.send(record, (Callback)cb);
                }
                if (batchIdx >= this.numBatches - 1) continue;
                long sleepStart = System.currentTimeMillis();
                try {
                    Thread.sleep(batchSleepMs);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                this.sleepTime += System.currentTimeMillis() - sleepStart;
            }
            this.producer.flush();
        }
    }
}

