/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.streams.tools;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

public class MirrorMaker {
    public static int nPartitions = 4;
    public static String srcTopicName;
    public static String dstTopicName;
    public static String consumerConfig;
    public static String producerConfig;
    public static boolean stopOnErr;
    public static Properties consumerProps;
    public static Properties producerProps;

    public static void usage() {
        System.err.println("MirrorMaker -src <topic-full-name> -dst <topic-full-name> [-consumerConfig ccfile -producerConfig pcfile -stopOnError true|false]");
        System.exit(1);
    }

    public static void main(String[] args) throws IOException {
        int i;
        System.out.println("Running mirror maker");
        for (int i2 = 0; i2 < args.length; ++i2) {
            if (args[i2].equals("-src")) {
                if (++i2 >= args.length) {
                    MirrorMaker.usage();
                }
                srcTopicName = args[i2];
                continue;
            }
            if (args[i2].equals("-dst")) {
                if (++i2 >= args.length) {
                    MirrorMaker.usage();
                }
                dstTopicName = args[i2];
                continue;
            }
            if (args[i2].equals("-consumerConfig")) {
                if (++i2 >= args.length) {
                    MirrorMaker.usage();
                }
                consumerConfig = args[i2];
                continue;
            }
            if (args[i2].equals("-producerConfig")) {
                if (++i2 >= args.length) {
                    MirrorMaker.usage();
                }
                producerConfig = args[i2];
                continue;
            }
            if (args[i2].equals("-stopOnError")) {
                if (++i2 >= args.length) {
                    MirrorMaker.usage();
                }
                stopOnErr = Boolean.parseBoolean(args[i2]);
                continue;
            }
            MirrorMaker.usage();
        }
        if (srcTopicName == null || dstTopicName == null) {
            MirrorMaker.usage();
        }
        MirrorMaker.initConsumer();
        MirrorMaker.initProducer();
        MirrorMaker.verifyAndPossiblyCreateDstTopic();
        Thread[] partitionWorkers = new Thread[nPartitions];
        for (i = 0; i < nPartitions; ++i) {
            KafkaProducer producer = new KafkaProducer(producerProps);
            KafkaConsumer consumer = new KafkaConsumer(consumerProps);
            PartitionWorker worker = new PartitionWorker(producer, consumer, srcTopicName, dstTopicName, i, stopOnErr);
            partitionWorkers[i] = new Thread(worker);
            partitionWorkers[i].start();
        }
        try {
            for (i = 0; i < partitionWorkers.length; ++i) {
                partitionWorkers[i].join();
            }
        }
        catch (InterruptedException e) {
            System.out.println("***** Joining partitionWorkers failed *****");
        }
    }

    public static void verifyAndPossiblyCreateDstTopic() {
        List dstPartitions;
        KafkaProducer producer = new KafkaProducer(producerProps);
        KafkaConsumer consumer = new KafkaConsumer(consumerProps);
        List srcPartitions = consumer.partitionsFor(srcTopicName);
        try {
            dstPartitions = producer.partitionsFor(dstTopicName);
        }
        catch (Exception e) {
            nPartitions = srcPartitions.size();
            producer.close();
            consumer.close();
            return;
        }
        if (dstPartitions.size() != srcPartitions.size()) {
            System.out.println("Source topic and destination topic do not have equal number of partitions");
            System.exit(1);
        }
        nPartitions = dstPartitions.size();
        producer.close();
        consumer.close();
    }

    public static void initProducer() throws IOException {
        producerProps = new Properties();
        if (producerConfig != null) {
            producerProps.load(new FileInputStream(producerConfig));
        }
        if (producerProps.getProperty("bootstrap.servers") == null) {
            producerProps.put("bootstrap.servers", "localhost:9092");
        }
        if (producerProps.getProperty("key.serializer") == null) {
            producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        }
        if (producerProps.getProperty("value.serializer") == null) {
            producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        }
        if (producerProps.getProperty("streams.parallel.flushers.per.partition") != null) {
            producerProps.put("streams.parallel.flushers.per.partition", (Object)false);
        }
    }

    public static void initConsumer() throws IOException {
        consumerProps = new Properties();
        if (consumerConfig != null) {
            consumerProps.load(new FileInputStream(consumerConfig));
        }
        if (consumerProps.getProperty("bootstrap.servers") == null) {
            consumerProps.put("bootstrap.servers", "localhost:9092");
        }
        if (consumerProps.getProperty("key.deserializer") == null) {
            consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
        if (consumerProps.getProperty("value.deserializer") == null) {
            consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
    }

    static {
        consumerConfig = null;
        stopOnErr = true;
    }

    private static final class PartitionWorker
    implements Runnable {
        KafkaProducer producer;
        KafkaConsumer consumer;
        String srcTopicName;
        String dstTopicName;
        int partitionId;
        int pollTimeout = 1000;
        boolean stopOnErr;

        public PartitionWorker(KafkaProducer producer, KafkaConsumer consumer, String srcTopicName, String dstTopicName, int partitionId, boolean stopOnError) {
            this.consumer = consumer;
            this.producer = producer;
            this.srcTopicName = srcTopicName;
            this.dstTopicName = dstTopicName;
            this.partitionId = partitionId;
            this.stopOnErr = this.stopOnErr;
        }

        @Override
        public void run() {
            try {
                TopicPartition partition = new TopicPartition(this.srcTopicName, this.partitionId);
                ArrayList<TopicPartition> partsToSubscribe = new ArrayList<TopicPartition>();
                partsToSubscribe.add(partition);
                this.consumer.assign(partsToSubscribe);
                ProducerCallback cb = new ProducerCallback(this.stopOnErr);
                this.consumer.seekToBeginning(Collections.singletonList(partition));
                block2: while (true) {
                    ConsumerRecords crecs = this.consumer.poll((long)this.pollTimeout);
                    Iterator iter = crecs.iterator();
                    while (true) {
                        if (!iter.hasNext()) continue block2;
                        ConsumerRecord crec = (ConsumerRecord)iter.next();
                        byte[] key = (byte[])crec.key();
                        byte[] value = (byte[])crec.value();
                        ProducerRecord prec = new ProducerRecord(this.dstTopicName, Integer.valueOf(this.partitionId), (Object)key, (Object)value);
                        this.producer.send(prec, (Callback)cb);
                    }
                    break;
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                System.out.println("PartitionWorker failed");
                this.producer.close();
                this.consumer.close();
                return;
            }
        }
    }

    private static final class ProducerCallback
    implements Callback {
        boolean stopOnErr;

        public ProducerCallback(boolean stopOnErr) {
            this.stopOnErr = stopOnErr;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                exception.printStackTrace();
                System.out.println("Send failed");
                if (this.stopOnErr) {
                    System.exit(1);
                }
            }
        }
    }
}

