/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.demo;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class DemoProducer {
    public static String[] streamNames;
    public static boolean multipleFlushers;
    public static int load;
    public static KafkaProducer producer;
    public static String producerConfig;

    public static void usage() {
        System.err.println("DemoProducer -path <topic-full-name> [-load <Kmsg/sec>] [-producerConfig configFile]");
        System.exit(1);
    }

    public static void main(String[] args) throws IOException {
        for (int i = 0; i < args.length; ++i) {
            if (args[i].equals("-path")) {
                if (++i >= args.length) {
                    DemoProducer.usage();
                }
                streamNames = args[i].split(",");
                System.out.println("Starting producer on " + args[i]);
                continue;
            }
            if (args[i].equals("-load")) {
                if (++i >= args.length) {
                    DemoProducer.usage();
                }
                load = Integer.parseInt(args[i]);
                System.out.println("Setting load to " + (load *= 1000) + "msgs/sec");
                continue;
            }
            if (args[i].equals("-producerConfig")) {
                if (++i >= args.length) {
                    DemoProducer.usage();
                }
                producerConfig = args[i];
                continue;
            }
            DemoProducer.usage();
        }
        if (streamNames == null || streamNames.length == 0) {
            DemoProducer.usage();
        }
        Properties props = new Properties();
        if (producerConfig != null) {
            props.load(new FileInputStream(producerConfig));
        }
        if (props.getProperty("bootstrap.servers") == null) {
            props.put("bootstrap.servers", "localhost:9092");
        }
        if (props.getProperty("key.serializer") == null) {
            props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        }
        if (props.getProperty("value.serializer") == null) {
            props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        }
        if (props.getProperty("streams.parallel.flushers.per.partition") != null) {
            props.put("streams.parallel.flushers.per.partition", (Object)multipleFlushers);
        }
        producer = new KafkaProducer(props);
        DemoProducer.produceTransactions();
    }

    public static void produceTransactions() {
        String goodLog = "This is a valid transaction";
        byte[] goodTransaction = goodLog.getBytes();
        Random streamNameRand = new Random();
        Random rand = new Random();
        int randomNum = rand.nextInt(3 * load);
        ++randomNum;
        ProducerCallback cb = new ProducerCallback();
        long i = 0L;
        System.out.println("There are " + streamNames.length + " streams");
        try {
            while (true) {
                long startTime = System.currentTimeMillis();
                for (int j = 0; j < load; ++j) {
                    int streamId = streamNameRand.nextInt(streamNames.length);
                    String keyStr = String.format("43%02d 3344 2341 %04d", streamId, i++ % 1000L);
                    byte[] key = keyStr.getBytes();
                    byte[] value = goodTransaction;
                    if (i % (long)randomNum == 0L) {
                        long currTime = System.currentTimeMillis();
                        String badLog = currTime + ": Suspicious transaction";
                        value = badLog.getBytes();
                        randomNum = rand.nextInt(load);
                        ++randomNum;
                        System.out.println(currTime + ":" + keyStr + ":Introduced a bad" + " transaction in " + streamNames[streamId]);
                    }
                    ProducerRecord record = new ProducerRecord(streamNames[streamId], null, (Object)key, (Object)value);
                    producer.send(record, (Callback)cb);
                }
                long endTime = startTime + 1000L;
                long currTime = System.currentTimeMillis();
                if (endTime <= currTime) continue;
                Thread.sleep(endTime - currTime);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            return;
        }
    }

    static {
        multipleFlushers = false;
        load = 500;
    }

    private static final class ProducerCallback
    implements Callback {
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                exception.printStackTrace();
                System.exit(1);
            }
        }
    }
}

