/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import scala.Array$;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.collection.JavaConversions$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.DoubleRef;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.util.Random$;

public final class EndToEndLatency$ {
    public static final EndToEndLatency$ MODULE$;
    private final long kafka$tools$EndToEndLatency$$timeout;

    static {
        new EndToEndLatency$();
    }

    public long kafka$tools$EndToEndLatency$$timeout() {
        return this.kafka$tools$EndToEndLatency$$timeout;
    }

    public void main(String[] args2) {
        String sslPropsFile;
        if (args2.length != 5 && args2.length != 6) {
            System.err.println(new StringBuilder().append((Object)"USAGE: java ").append((Object)this.getClass().getName()).append((Object)" broker_list topic num_messages producer_acks message_size_bytes [optional] ssl_properties_file").toString());
            System.exit(1);
        }
        String brokerList = args2[0];
        String topic = args2[1];
        int numMessages = new StringOps(Predef$.MODULE$.augmentString(args2[2])).toInt();
        String producerAcks = args2[3];
        int messageLen = new StringOps(Predef$.MODULE$.augmentString(args2[4])).toInt();
        String string2 = sslPropsFile = args2.length == 6 ? args2[5] : "";
        if (((List)List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1", "all"}))).contains(producerAcks)) {
            Properties consumerProps = sslPropsFile.equals("") ? new Properties() : Utils.loadProps(sslPropsFile);
            consumerProps.put("bootstrap.servers", brokerList);
            consumerProps.put("group.id", new StringBuilder().append((Object)"test-group-").append(BoxesRunTime.boxToLong(System.currentTimeMillis())).toString());
            consumerProps.put("enable.auto.commit", "false");
            consumerProps.put("auto.offset.reset", "latest");
            consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            consumerProps.put("fetch.max.wait.ms", "0");
            KafkaConsumer consumer = new KafkaConsumer(consumerProps);
            consumer.subscribe(JavaConversions$.MODULE$.seqAsJavaList(List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic}))));
            Properties producerProps = sslPropsFile.equals("") ? new Properties() : Utils.loadProps(sslPropsFile);
            producerProps.put("bootstrap.servers", brokerList);
            producerProps.put("linger.ms", "0");
            producerProps.put("block.on.buffer.full", "true");
            producerProps.put("acks", producerAcks.toString());
            producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
            KafkaProducer producer = new KafkaProducer(producerProps);
            consumer.seekToEnd(new TopicPartition[0]);
            consumer.poll(0L);
            DoubleRef totalTime = new DoubleRef(0.0);
            long[] latencies = new long[numMessages];
            RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numMessages).foreach$mVc$sp((Function1<Object, BoxedUnit>)((Object)new Serializable(topic, messageLen, consumer, producer, totalTime, latencies){
                public static final long serialVersionUID = 0L;
                private final String topic$1;
                private final int messageLen$1;
                private final KafkaConsumer consumer$1;
                private final KafkaProducer producer$1;
                private final DoubleRef totalTime$1;
                private final long[] latencies$1;

                public final void apply(int i) {
                    this.apply$mcVI$sp(i);
                }

                /*
                 * WARNING - void declaration
                 */
                public void apply$mcVI$sp(int i) {
                    byte[] message = EndToEndLatency$.MODULE$.randomBytesOfLen(this.messageLen$1);
                    long begin = System.nanoTime();
                    this.producer$1.send(new ProducerRecord<K, byte[]>(this.topic$1, message)).get();
                    Iterator<ConsumerRecord<K, V>> recordIter = this.consumer$1.poll(EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$timeout()).iterator();
                    long elapsed = System.nanoTime() - begin;
                    if (recordIter.hasNext()) {
                        void var8_6;
                        void var9_7;
                        String sent = new String(message);
                        String read2 = new String((byte[])recordIter.next().value());
                        if (read2.equals(sent)) {
                            if (recordIter.hasNext()) {
                                IntRef count2 = new IntRef(1);
                                JavaConversions$.MODULE$.asScalaIterator(recordIter).foreach(new Serializable(this, count2){
                                    public static final long serialVersionUID = 0L;
                                    private final IntRef count$1;

                                    public final void apply(ConsumerRecord<byte[], byte[]> elem2) {
                                        ++this.count$1.elem;
                                    }
                                    {
                                        this.count$1 = count$1;
                                    }
                                });
                                throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Only one result was expected during this test. We found [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(count2.elem)})));
                            }
                            if (i % 1000 == 0) {
                                Predef$.MODULE$.println(new StringBuilder().append(i).append((Object)"\t").append(BoxesRunTime.boxToDouble((double)elapsed / 1000.0 / 1000.0)).toString());
                            }
                            this.totalTime$1.elem += (double)elapsed;
                            this.latencies$1[i] = elapsed / 1000L / 1000L;
                            return;
                        }
                        EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$finalise$1(this.consumer$1, this.producer$1);
                        throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"The message read [", "] did not match the message sent [", "]"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{var9_7, var8_6})));
                    }
                    EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$finalise$1(this.consumer$1, this.producer$1);
                    throw new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray((Object[])new String[]{"poll() timed out before finding a result (timeout:[", "])"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(EndToEndLatency$.MODULE$.kafka$tools$EndToEndLatency$$timeout())})));
                }
                {
                    this.topic$1 = topic$1;
                    this.messageLen$1 = messageLen$1;
                    this.consumer$1 = consumer$1;
                    this.producer$1 = producer$1;
                    this.totalTime$1 = totalTime$1;
                    this.latencies$1 = latencies$1;
                }
            }));
            Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Avg latency: %.4f ms\n")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToDouble(totalTime.elem / (double)numMessages / 1000.0 / 1000.0)})));
            Arrays.sort(latencies);
            long p50 = latencies[(int)((double)latencies.length * 0.5)];
            long p99 = latencies[(int)((double)latencies.length * 0.99)];
            long p999 = latencies[(int)((double)latencies.length * 0.999)];
            Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Percentiles: 50th = %d, 99th = %d, 99.9th = %d")).format(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(p50), BoxesRunTime.boxToLong(p99), BoxesRunTime.boxToLong(p999)})));
            this.kafka$tools$EndToEndLatency$$finalise$1(consumer, producer);
            return;
        }
        throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
    }

    public byte[] randomBytesOfLen(int len) {
        return (byte[])Array$.MODULE$.fill(len, new Serializable(){
            public static final long serialVersionUID = 0L;

            public final byte apply() {
                return this.apply$mcB$sp();
            }

            public byte apply$mcB$sp() {
                return (byte)(Random$.MODULE$.nextInt(26) + 65);
            }
        }, ClassTag$.MODULE$.Byte());
    }

    public final void kafka$tools$EndToEndLatency$$finalise$1(KafkaConsumer consumer$1, KafkaProducer producer$1) {
        consumer$1.commitSync();
        producer$1.close();
        consumer$1.close();
    }

    private EndToEndLatency$() {
        MODULE$ = this;
        this.kafka$tools$EndToEndLatency$$timeout = 60000L;
    }
}

