package com.mapr.streams.tools;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:com/mapr/streams/tools/MirrorMaker.class */
public class MirrorMaker {
    public static String srcTopicName;
    public static String dstTopicName;
    public static String producerConfig;
    public static Properties consumerProps;
    public static Properties producerProps;
    public static int nPartitions = 4;
    public static String consumerConfig = null;
    public static boolean stopOnErr = true;

    /* loaded from: input_file:com/mapr/streams/tools/MirrorMaker$PartitionWorker.class */
    private static final class PartitionWorker implements Runnable {
        KafkaProducer producer;
        KafkaConsumer consumer;
        String srcTopicName;
        String dstTopicName;
        int partitionId;
        int pollTimeout = 1000;
        boolean stopOnErr = this.stopOnErr;
        boolean stopOnErr = this.stopOnErr;

        public PartitionWorker(KafkaProducer kafkaProducer, KafkaConsumer kafkaConsumer, String str, String str2, int i, boolean z) {
            this.consumer = kafkaConsumer;
            this.producer = kafkaProducer;
            this.srcTopicName = str;
            this.dstTopicName = str2;
            this.partitionId = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                TopicPartition topicPartition = new TopicPartition(this.srcTopicName, this.partitionId);
                ArrayList arrayList = new ArrayList();
                arrayList.add(topicPartition);
                this.consumer.assign(arrayList);
                ProducerCallback producerCallback = new ProducerCallback(this.stopOnErr);
                this.consumer.seekToBeginning(new TopicPartition[]{topicPartition});
                while (true) {
                    Iterator it = this.consumer.poll(this.pollTimeout).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        this.producer.send(new ProducerRecord(this.dstTopicName, Integer.valueOf(this.partitionId), (byte[]) consumerRecord.key(), (byte[]) consumerRecord.value()), producerCallback);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("PartitionWorker failed");
                this.producer.close();
                this.consumer.close();
            }
        }
    }

    /* loaded from: input_file:com/mapr/streams/tools/MirrorMaker$ProducerCallback.class */
    private static final class ProducerCallback implements Callback {
        boolean stopOnErr;

        public ProducerCallback(boolean z) {
            this.stopOnErr = z;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                exc.printStackTrace();
                System.out.println("Send failed");
                if (this.stopOnErr) {
                    System.exit(1);
                }
            }
        }
    }

    public static void usage() {
        System.err.println("MirrorMaker -src <topic-full-name> -dst <topic-full-name> [-consumerConfig ccfile -producerConfig pcfile -stopOnError true|false]");
        System.exit(1);
    }

    public static void main(String[] strArr) throws IOException {
        System.out.println("Running mirror maker");
        int i = 0;
        while (i < strArr.length) {
            if (strArr[i].equals("-src")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                srcTopicName = strArr[i];
            } else if (strArr[i].equals("-dst")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                dstTopicName = strArr[i];
            } else if (strArr[i].equals("-consumerConfig")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                consumerConfig = strArr[i];
            } else if (strArr[i].equals("-producerConfig")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                producerConfig = strArr[i];
            } else if (strArr[i].equals("-stopOnError")) {
                i++;
                if (i >= strArr.length) {
                    usage();
                }
                stopOnErr = Boolean.parseBoolean(strArr[i]);
            } else {
                usage();
            }
            i++;
        }
        if (srcTopicName == null || dstTopicName == null) {
            usage();
        }
        initConsumer();
        initProducer();
        verifyAndPossiblyCreateDstTopic();
        Thread[] threadArr = new Thread[nPartitions];
        for (int i2 = 0; i2 < nPartitions; i2++) {
            threadArr[i2] = new Thread(new PartitionWorker(new KafkaProducer(producerProps), new KafkaConsumer(consumerProps), srcTopicName, dstTopicName, i2, stopOnErr));
            threadArr[i2].start();
        }
        for (Thread thread : threadArr) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                System.out.println("***** Joining partitionWorkers failed *****");
                return;
            }
        }
    }

    public static void verifyAndPossiblyCreateDstTopic() {
        KafkaProducer kafkaProducer = new KafkaProducer(producerProps);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerProps);
        List partitionsFor = kafkaConsumer.partitionsFor(srcTopicName);
        try {
            List partitionsFor2 = kafkaProducer.partitionsFor(dstTopicName);
            if (partitionsFor2.size() != partitionsFor.size()) {
                System.out.println("Source topic and destination topic do not have equal number of partitions");
                System.exit(1);
            }
            nPartitions = partitionsFor2.size();
            kafkaProducer.close();
            kafkaConsumer.close();
        } catch (Exception e) {
            nPartitions = partitionsFor.size();
            kafkaProducer.close();
            kafkaConsumer.close();
        }
    }

    public static void initProducer() throws IOException {
        producerProps = new Properties();
        if (producerConfig != null) {
            producerProps.load(new FileInputStream(producerConfig));
        }
        if (producerProps.getProperty("bootstrap.servers") == null) {
            producerProps.put("bootstrap.servers", "localhost:9092");
        }
        if (producerProps.getProperty("key.serializer") == null) {
            producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        }
        if (producerProps.getProperty("value.serializer") == null) {
            producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        }
        if (producerProps.getProperty("streams.parallel.flushers.per.partition") != null) {
            producerProps.put("streams.parallel.flushers.per.partition", false);
        }
    }

    public static void initConsumer() throws IOException {
        consumerProps = new Properties();
        if (consumerConfig != null) {
            consumerProps.load(new FileInputStream(consumerConfig));
        }
        if (consumerProps.getProperty("bootstrap.servers") == null) {
            consumerProps.put("bootstrap.servers", "localhost:9092");
        }
        if (consumerProps.getProperty("key.deserializer") == null) {
            consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
        if (consumerProps.getProperty("value.deserializer") == null) {
            consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        }
    }
}
