/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageGenerator {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageGenerator.class);
    private Properties producerProperties = new Properties();

    public KafkaMessageGenerator(String broker, Class<?> valueSerializer) {
        this.producerProperties.put("bootstrap.servers", broker);
        this.producerProperties.put("acks", "all");
        this.producerProperties.put("retries", (Object)3);
        this.producerProperties.put("batch.size", (Object)16384);
        this.producerProperties.put("linger.ms", (Object)0);
        this.producerProperties.put("max.in.flight.requests.per.connection", (Object)1);
        this.producerProperties.put("request.timeout.ms", (Object)5000);
        this.producerProperties.put("client.id", "drill-test-kafka-client");
        this.producerProperties.put("key.serializer", StringSerializer.class);
        this.producerProperties.put("value.serializer", valueSerializer);
        this.producerProperties.put("enable.idempotence", (Object)true);
    }

    public void populateAvroMsgIntoKafka(String topic, int numMsg) throws IOException {
        KafkaProducer producer = new KafkaProducer(this.producerProperties);
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(Resources.getResource((String)"drill-avro-test.avsc").openStream());
        GenericRecordBuilder builder = new GenericRecordBuilder(schema);
        Random rand = new Random();
        for (int i = 0; i < numMsg; ++i) {
            builder.set("key1", (Object)UUID.randomUUID().toString());
            builder.set("key2", (Object)rand.nextInt());
            builder.set("key3", (Object)rand.nextBoolean());
            ArrayList list = Lists.newArrayList();
            list.add(rand.nextInt(100));
            list.add(rand.nextInt(100));
            list.add(rand.nextInt(100));
            builder.set("key5", (Object)list);
            HashMap map = Maps.newHashMap();
            map.put("key61", rand.nextDouble());
            map.put("key62", rand.nextDouble());
            builder.set("key6", (Object)map);
            GenericData.Record producerRecord = builder.build();
            ProducerRecord record = new ProducerRecord(topic, (Object)producerRecord);
            producer.send(record);
        }
        producer.close();
    }

    public void populateJsonMsgIntoKafka(String topic, int numMsg) throws InterruptedException, ExecutionException {
        Random rand = new Random();
        try (KafkaProducer producer = new KafkaProducer(this.producerProperties);){
            for (int i = 0; i < numMsg; ++i) {
                JsonObject object = new JsonObject();
                object.addProperty("key1", UUID.randomUUID().toString());
                object.addProperty("key2", (Number)rand.nextInt());
                object.addProperty("key3", Boolean.valueOf(rand.nextBoolean()));
                JsonArray element2 = new JsonArray();
                element2.add((JsonElement)new JsonPrimitive((Number)rand.nextInt(100)));
                element2.add((JsonElement)new JsonPrimitive((Number)rand.nextInt(100)));
                element2.add((JsonElement)new JsonPrimitive((Number)rand.nextInt(100)));
                object.add("key5", (JsonElement)element2);
                JsonObject element3 = new JsonObject();
                element3.addProperty("key61", (Number)rand.nextDouble());
                element3.addProperty("key62", (Number)rand.nextDouble());
                object.add("key6", (JsonElement)element3);
                ProducerRecord message = new ProducerRecord(topic, (Object)object.toString());
                logger.info("Publishing message : {}", (Object)message);
                Future future = producer.send(message);
                logger.info("Committed offset of the message : {}", (Object)((RecordMetadata)future.get()).offset());
            }
        }
    }

    public void populateJsonMsgWithTimestamps(String topic, int numMsg) {
        Random rand = new Random();
        try (KafkaProducer producer = null;){
            producer = new KafkaProducer(this.producerProperties);
            int halfCount = numMsg / 2;
            for (PartitionInfo tpInfo : producer.partitionsFor(topic)) {
                for (int i = 1; i <= numMsg; ++i) {
                    JsonObject object = new JsonObject();
                    object.addProperty("stringKey", UUID.randomUUID().toString());
                    object.addProperty("intKey", (Number)(numMsg - i));
                    object.addProperty("boolKey", Boolean.valueOf(i % 2 == 0));
                    long timestamp = i < halfCount ? (long)(halfCount - i) : (long)i;
                    ProducerRecord message = new ProducerRecord(tpInfo.topic(), Integer.valueOf(tpInfo.partition()), Long.valueOf(timestamp), (Object)("key" + i), (Object)object.toString());
                    logger.info("Publishing message : {}", (Object)message);
                    Future future = producer.send(message);
                    logger.info("Committed offset of the message : {}", (Object)((RecordMetadata)future.get()).offset());
                }
            }
        }
    }
}

