/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.producer;

import com.mapr.fs.proto.Marlinserver;
import com.mapr.streams.producer.ProducerRecordGenerator;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Properties;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Producer {
    public static String streamName;
    public static int numStreams;
    public static int numTopics;
    public static int numSlowTopics;
    public static int numPartitions;
    public static int numHeaders;
    public static boolean sendHdrVal;
    public static int numMsgsPerPartition;
    public static boolean multipleFlushers;
    public static KafkaProducer producer;
    public static final int MSG_VALUE_LENGTH = 200;
    public static boolean printStats;
    public static ProducerRecordGenerator rgen;

    public static void usage() {
        System.err.println("Producer -path <stream-intialname> [-nstreams <num streams> -ntopics <num topics> -nslowtopics <num topics> -npart <numpartitions per topic> -nmsgs <num messages per topicfeed>]  -multiflush <enable multiple flushers>]");
        System.exit(1);
    }

    public static void main(String[] args) throws IOException {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                if (++i >= args.length) {
                    Producer.usage();
                }
                streamName = args[i];
                continue;
            }
            if (args[i].equals("-nstreams")) {
                if (++i >= args.length) {
                    Producer.usage();
                }
                numStreams = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-nmsgs")) {
                if (++i >= args.length) {
                    Producer.usage();
                }
                numMsgsPerPartition = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-ntopics")) {
                if (++i >= args.length) {
                    Producer.usage();
                }
                numTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-nslowtopics")) {
                if (++i >= args.length) {
                    Producer.usage();
                }
                numSlowTopics = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-npart")) {
                if (++i >= args.length) {
                    Producer.usage();
                }
                numPartitions = Integer.parseInt(args[i]);
                continue;
            }
            if (args[i].equals("-multiflush")) {
                if (++i >= args.length) {
                    Producer.usage();
                }
                multipleFlushers = Boolean.parseBoolean(args[i]);
                continue;
            }
            Producer.usage();
        }
        Producer.run();
    }

    public static boolean runTest(String path, int nstreams, int ntopics, int nslowtopics, int npart, int nmsgs, boolean mflushers, int numHdrs, boolean hdrVal) {
        streamName = path;
        numStreams = nstreams;
        numMsgsPerPartition = nmsgs;
        numTopics = ntopics;
        numSlowTopics = nslowtopics;
        numPartitions = npart;
        printStats = false;
        multipleFlushers = mflushers;
        numHeaders = numHdrs;
        sendHdrVal = hdrVal;
        GenericHFactory producerFactory = new GenericHFactory();
        rgen = (ProducerRecordGenerator)GenericHFactory.getImplementorInstance((String)"com.mapr.streams.producer.ProducerRecordGeneratorV10", (Object[])new Object[0], (Class[])new Class[0]);
        return Producer.run();
    }

    public static boolean runTest(String path, int nstreams, int ntopics, int nslowtopics, int npart, int nmsgs, boolean mflushers, int numHdrs, boolean hdrVal, ProducerRecordGenerator recgen) {
        streamName = path;
        numStreams = nstreams;
        numMsgsPerPartition = nmsgs;
        numTopics = ntopics;
        numSlowTopics = nslowtopics;
        numPartitions = npart;
        printStats = false;
        multipleFlushers = mflushers;
        numHeaders = numHdrs;
        sendHdrVal = hdrVal;
        if (rgen == null) {
            rgen = recgen;
        }
        return Producer.run();
    }

    public static boolean runTest(String path, int nstreams, int ntopics, int nslowtopics, int npart, int nmsgs, boolean mflushers) {
        ProducerRecordGenerator rgen;
        int numHdrs = numHeaders;
        boolean hdrVal = sendHdrVal;
        GenericHFactory producerFactory = new GenericHFactory();
        if (Boolean.getBoolean("DISABLE_V10_TESTS")) {
            numHdrs = 0;
            hdrVal = false;
            rgen = (ProducerRecordGenerator)GenericHFactory.getImplementorInstance((String)"com.mapr.streams.producer.ProducerRecordGenerator", (Object[])new Object[0], (Class[])new Class[0]);
        } else {
            rgen = (ProducerRecordGenerator)GenericHFactory.getImplementorInstance((String)"com.mapr.streams.producer.ProducerRecordGeneratorV10", (Object[])new Object[0], (Class[])new Class[0]);
        }
        return Producer.runTest(path, nstreams, ntopics, nslowtopics, npart, nmsgs, mflushers, numHdrs, hdrVal, rgen);
    }

    public static boolean run() {
        int totalNumMsgs = 0;
        int totalNumTopics = numTopics + numSlowTopics;
        if (streamName == null || streamName.length() == 0) {
            System.err.println("stream name cannot be empty.");
            Producer.usage();
        }
        if (numPartitions <= 0) {
            System.err.println("num partitions cannot be negative or zero.");
            Producer.usage();
        }
        if (numTopics <= 0) {
            System.err.println("num topics cannot be negative or zero.");
        }
        totalNumMsgs = numMsgsPerPartition * numPartitions * numTopics * numStreams;
        totalNumMsgs += numMsgsPerPartition / 1000 * numPartitions * numSlowTopics * numStreams;
        String[] topicNames = new String[totalNumTopics * numStreams];
        for (int s = 0; s < numStreams; ++s) {
            for (int i = 0; i < totalNumTopics; ++i) {
                String topicName = streamName + s + ":topic" + i;
                topicNames[s * totalNumTopics + i] = topicName;
            }
        }
        Properties props = new Properties();
        Marlinserver.MarlinConfigDefaults cdef = Marlinserver.MarlinConfigDefaults.getDefaultInstance();
        props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put(cdef.getParallelFlushersPerPartition(), (Object)multipleFlushers);
        props.put(cdef.getBufferTime(), (Object)1000);
        props.put(cdef.getMetadataMaxAge(), (Object)1000);
        producer = new KafkaProducer(props);
        PerfStats stats = new PerfStats(totalNumMsgs);
        for (int sIdx = 0; sIdx < numStreams; ++sIdx) {
            for (int topicIdx = 0; topicIdx < totalNumTopics; ++topicIdx) {
                for (int partIdx = 0; partIdx < numPartitions; ++partIdx) {
                    int msgIdx = 0;
                    for (int i = 0; i < numMsgsPerPartition; ++i) {
                        if (topicIdx >= numTopics && i % 1000 != 0) continue;
                        String topicName = topicNames[sIdx * totalNumTopics + topicIdx];
                        byte[] key = (topicName + ":" + partIdx + ":" + msgIdx).getBytes();
                        int keySz = key.length;
                        byte[] value = new byte[200];
                        ByteBuffer valueBuffer = ByteBuffer.wrap(value);
                        for (int j = 0; j < 200 / keySz; ++j) {
                            valueBuffer.put(key);
                        }
                        long sendStart = System.currentTimeMillis();
                        long sendBytes = keySz + 200;
                        PerfCallback cb = new PerfCallback(sendStart, sendBytes, stats);
                        ProducerRecord<byte[], byte[]> record = rgen.generateProducerRecord(partIdx, msgIdx, topicName, key, value, numHeaders, sendHdrVal);
                        producer.send(record, (Callback)cb);
                        ++msgIdx;
                    }
                }
            }
        }
        producer.flush();
        boolean verify = stats.checkAndVerify();
        producer.close();
        if (printStats) {
            stats.printReport();
        }
        return verify;
    }

    static {
        numStreams = 2;
        numTopics = 2;
        numSlowTopics = 2;
        numPartitions = 4;
        numHeaders = 5;
        sendHdrVal = true;
        numMsgsPerPartition = 100000;
        multipleFlushers = true;
        printStats = true;
        rgen = null;
    }

    private static final class PerfStats {
        private long startTime;
        private long endTime;
        private long minLatency;
        private long maxLatency;
        private long totalLatency;
        private long totalBytes;
        private long msgCount;
        private long totalMsgsToExpect;

        public PerfStats(int totalMsgs) {
            this.totalMsgsToExpect = totalMsgs;
            this.minLatency = 999999L;
            this.maxLatency = 0L;
            this.totalBytes = 0L;
            this.msgCount = 0L;
            this.totalLatency = 0L;
            this.endTime = -1L;
            this.startTime = System.currentTimeMillis();
        }

        public synchronized boolean checkAndVerify() {
            if (this.totalMsgsToExpect != this.msgCount) {
                System.out.println("***** Verification failed! *****");
                return false;
            }
            return true;
        }

        public synchronized void report(long latency, long bytes) {
            this.minLatency = Math.min(this.minLatency, latency);
            this.maxLatency = Math.max(this.maxLatency, latency);
            this.totalBytes += bytes;
            ++this.msgCount;
            this.totalLatency += latency;
            if (this.msgCount == this.totalMsgsToExpect) {
                this.endTime = System.currentTimeMillis();
            }
        }

        public synchronized void printReport() {
            System.out.println("Expected nMsgs: " + this.totalMsgsToExpect);
            System.out.println("Callback nMsgs: " + this.msgCount);
            if (this.endTime == -1L) {
                this.endTime = System.currentTimeMillis();
            }
            System.out.println("Total time (ms): " + (this.endTime - this.startTime));
            System.out.println("Total bytes sent: " + this.totalBytes);
            System.out.println("Min/Max latency (ms): " + this.minLatency + "/" + this.maxLatency);
            System.out.println("Average latency (ms): " + this.totalLatency / this.msgCount);
            System.out.println("Average nMsgs/sec: " + (double)this.msgCount * 1.0 / (double)(this.endTime - this.startTime) * 1000.0);
            System.out.println("Average nKBs/sec: " + (double)(this.totalBytes / (this.endTime - this.startTime)) * 1000.0 / 1024.0);
        }
    }

    private static final class PerfCallback
    implements Callback {
        private final long start;
        private final long bytes;
        private PerfStats stats;

        public PerfCallback(long start, long bytes, PerfStats stats) {
            this.start = start;
            this.bytes = bytes;
            this.stats = stats;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long now = System.currentTimeMillis();
            int latency = (int)(now - this.start);
            this.stats.report(latency, this.bytes);
            if (exception != null) {
                exception.printStackTrace();
                System.exit(1);
            }
        }
    }
}

