/*
 * Decompiled with CFR 0.152.
 */
package kafka.examples;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import kafka.examples.Consumer;
import kafka.examples.ExactlyOnceMessageProcessor;
import kafka.examples.Producer;
import kafka.examples.Utils;

public class KafkaExactlyOnceDemo {
    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";
    public static final String GROUP_NAME = "check-group";

    public static void main(String[] args) {
        try {
            if (args.length != 3) {
                Utils.printHelp("This example takes 3 parameters (i.e. 6 3 10000):%n- partition: number of partitions for input and output topics (required)%n- instances: number of application instances (required)%n- records: total number of records (required)", new Object[0]);
                return;
            }
            int numPartitions = Integer.parseInt(args[0]);
            int numInstances = Integer.parseInt(args[1]);
            int numRecords = Integer.parseInt(args[2]);
            Utils.recreateTopics("localhost:9092", numPartitions, INPUT_TOPIC, OUTPUT_TOPIC);
            CountDownLatch producerLatch = new CountDownLatch(1);
            Producer producerThread = new Producer("producer", "localhost:9092", INPUT_TOPIC, false, null, true, numRecords, -1, producerLatch);
            producerThread.start();
            if (!producerLatch.await(2L, TimeUnit.MINUTES)) {
                Utils.printErr("Timeout after 2 minutes waiting for data load", new Object[0]);
                producerThread.shutdown();
                return;
            }
            CountDownLatch processorsLatch = new CountDownLatch(numInstances);
            List<ExactlyOnceMessageProcessor> processors = IntStream.range(0, numInstances).mapToObj(id -> new ExactlyOnceMessageProcessor("processor-" + id, "localhost:9092", INPUT_TOPIC, OUTPUT_TOPIC, processorsLatch)).collect(Collectors.toList());
            processors.forEach(Thread::start);
            if (!processorsLatch.await(2L, TimeUnit.MINUTES)) {
                Utils.printErr("Timeout after 2 minutes waiting for record copy", new Object[0]);
                processors.forEach(ExactlyOnceMessageProcessor::shutdown);
                return;
            }
            CountDownLatch consumerLatch = new CountDownLatch(1);
            Consumer consumerThread = new Consumer("consumer", "localhost:9092", OUTPUT_TOPIC, GROUP_NAME, Optional.empty(), true, numRecords, consumerLatch);
            consumerThread.start();
            if (!consumerLatch.await(2L, TimeUnit.MINUTES)) {
                Utils.printErr("Timeout after 2 minutes waiting for output read", new Object[0]);
                consumerThread.shutdown();
            }
        }
        catch (Throwable e) {
            e.printStackTrace();
        }
    }
}

