/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.kafka;

import com.google.common.collect.Lists;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageGenerator {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageGenerator.class);
    public static final String SCHEMA_REGISTRY_URL = "mock://testurl";
    private final Properties producerProperties = new Properties();

    public KafkaMessageGenerator(String broker, Class<?> valueSerializer) {
        this.producerProperties.put("bootstrap.servers", broker);
        this.producerProperties.put("acks", "all");
        this.producerProperties.put("retries", (Object)3);
        this.producerProperties.put("batch.size", (Object)16384);
        this.producerProperties.put("linger.ms", (Object)0);
        this.producerProperties.put("max.in.flight.requests.per.connection", (Object)1);
        this.producerProperties.put("request.timeout.ms", (Object)5000);
        this.producerProperties.put("client.id", "drill-test-kafka-client");
        this.producerProperties.put("key.serializer", StringSerializer.class);
        this.producerProperties.put("value.serializer", valueSerializer);
        this.producerProperties.put("enable.idempotence", (Object)true);
        this.producerProperties.put("schema.registry.url", SCHEMA_REGISTRY_URL);
    }

    public void populateAvroMsgIntoKafka(String topic, int numMsg) {
        this.producerProperties.put("key.serializer", KafkaAvroSerializer.class);
        try (KafkaProducer producer = new KafkaProducer(this.producerProperties);){
            Schema.Parser parser = new Schema.Parser();
            String userSchema = "{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"key1\",\"type\":\"string\"},{\"name\":\"key2\",\"type\":\"int\"},{\"name\":\"key3\",\"type\":\"boolean\"},{\"name\":\"key5\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"key6\",\"type\":{\"type\":\"record\",\"name\":\"myrecord6\",\"fields\":[{\"name\":\"key61\",\"type\":\"double\"},{\"name\":\"key62\",\"type\":\"double\"}]}}]}";
            Schema valueSchema = parser.parse(userSchema);
            GenericRecordBuilder valueBuilder = new GenericRecordBuilder(valueSchema);
            String key1Schema = "{\"type\":\"record\",\"name\":\"key1record\",\"fields\":[{\"name\":\"key1\",\"type\":\"string\"}]}";
            Schema keySchema = parser.parse(key1Schema);
            GenericRecordBuilder keyBuilder = new GenericRecordBuilder(keySchema);
            Random rand = new Random();
            for (int i = 0; i < numMsg; ++i) {
                String key1 = UUID.randomUUID().toString();
                valueBuilder.set("key1", (Object)key1);
                valueBuilder.set("key2", (Object)rand.nextInt());
                valueBuilder.set("key3", (Object)rand.nextBoolean());
                ArrayList list = Lists.newArrayList();
                list.add(rand.nextInt(100));
                list.add(rand.nextInt(100));
                list.add(rand.nextInt(100));
                valueBuilder.set("key5", (Object)list);
                GenericRecordBuilder innerBuilder = new GenericRecordBuilder(valueSchema.getField("key6").schema());
                innerBuilder.set("key61", (Object)rand.nextDouble());
                innerBuilder.set("key62", (Object)rand.nextDouble());
                valueBuilder.set("key6", (Object)innerBuilder.build());
                GenericData.Record producerRecord = valueBuilder.build();
                keyBuilder.set("key1", (Object)key1);
                GenericData.Record keyRecord = keyBuilder.build();
                ProducerRecord record = new ProducerRecord(topic, (Object)keyRecord, (Object)producerRecord);
                producer.send(record);
            }
        }
    }

    public void populateJsonMsgIntoKafka(String topic, int numMsg) throws ExecutionException, InterruptedException {
        try (KafkaProducer producer = new KafkaProducer(this.producerProperties);){
            Random rand = new Random();
            for (int i = 0; i < numMsg; ++i) {
                JsonObject object = new JsonObject();
                object.addProperty("key1", UUID.randomUUID().toString());
                object.addProperty("key2", (Number)rand.nextInt());
                object.addProperty("key3", Boolean.valueOf(rand.nextBoolean()));
                JsonArray element2 = new JsonArray();
                element2.add((JsonElement)new JsonPrimitive((Number)rand.nextInt(100)));
                element2.add((JsonElement)new JsonPrimitive((Number)rand.nextInt(100)));
                element2.add((JsonElement)new JsonPrimitive((Number)rand.nextInt(100)));
                object.add("key5", (JsonElement)element2);
                JsonObject element3 = new JsonObject();
                element3.addProperty("key61", (Number)rand.nextDouble());
                element3.addProperty("key62", (Number)rand.nextDouble());
                object.add("key6", (JsonElement)element3);
                ProducerRecord message = new ProducerRecord(topic, (Object)object.toString());
                logger.info("Publishing message : {}", (Object)message);
                Future future = producer.send(message);
                logger.info("Committed offset of the message : {}", (Object)((RecordMetadata)future.get()).offset());
            }
        }
    }

    public void populateJsonMsgWithTimestamps(String topic, int numMsg) throws ExecutionException, InterruptedException {
        try (KafkaProducer producer = new KafkaProducer(this.producerProperties);){
            int halfCount = numMsg / 2;
            for (PartitionInfo tpInfo : producer.partitionsFor(topic)) {
                for (int i = 1; i <= numMsg; ++i) {
                    JsonObject object = new JsonObject();
                    object.addProperty("stringKey", UUID.randomUUID().toString());
                    object.addProperty("intKey", (Number)(numMsg - i));
                    object.addProperty("boolKey", Boolean.valueOf(i % 2 == 0));
                    long timestamp = i < halfCount ? (long)(halfCount - i) : (long)i;
                    ProducerRecord message = new ProducerRecord(tpInfo.topic(), Integer.valueOf(tpInfo.partition()), Long.valueOf(timestamp), (Object)("key" + i), (Object)object.toString());
                    logger.info("Publishing message : {}", (Object)message);
                    Future future = producer.send(message);
                    logger.info("Committed offset of the message : {}", (Object)((RecordMetadata)future.get()).offset());
                }
            }
        }
    }

    public void populateMessages(String topic, String ... messages) throws ExecutionException, InterruptedException {
        try (KafkaProducer producer = new KafkaProducer(this.producerProperties);){
            for (String content : messages) {
                ProducerRecord message = new ProducerRecord(topic, (Object)content);
                logger.info("Publishing message : {}", (Object)message);
                Future future = producer.send(message);
                logger.info("Committed offset of the message : {}", (Object)((RecordMetadata)future.get()).offset());
            }
        }
    }
}

