/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.rules.cel;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rules.Customer;
import io.confluent.kafka.schemaregistry.rules.ResourceLoader;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.RuleSet;
import io.confluent.kafka.schemaregistry.storage.RuleSetHandler;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.WrapperKeyDeserializer;
import io.confluent.kafka.serializers.WrapperKeySerializer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CelExecutorIntegrationTest
extends ClusterTestHarness {
    private static final Logger log = LoggerFactory.getLogger(CelExecutorIntegrationTest.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
    private static final String TOPIC = "customer";
    private static final String DLQ_TOPIC1 = "DLQ1";
    private static final String DLQ_TOPIC2 = "DLQ2";
    private static final String DLQ_TOPIC3 = "DLQ3";

    public CelExecutorIntegrationTest() {
        super(1, true);
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        ((KafkaSchemaRegistry)this.restApp.schemaRegistry()).setRuleSetHandler(new RuleSetHandler(){

            public void handle(String subject, ConfigUpdateRequest request) {
            }

            public void handle(String subject, boolean normalize, RegisterSchemaRequest request) {
            }

            public RuleSet transform(io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet ruleSet) {
                return ruleSet != null ? new RuleSet(ruleSet) : null;
            }
        });
    }

    private static void registerSchema(String schemaRegistryUrl) throws Exception {
        ResourceLoader resourceLoader = new ResourceLoader("/");
        String schemaString = resourceLoader.toString("test_cel.json");
        Schema schema = (Schema)mapper.readValue(schemaString, Schema.class);
        CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new RestService(schemaRegistryUrl), 10, (List)ImmutableList.of((Object)new AvroSchemaProvider(), (Object)new ProtobufSchemaProvider(), (Object)new JsonSchemaProvider()), Collections.emptyMap(), Collections.emptyMap());
        Optional parsedSchema = schemaRegistry.parseSchema(schema);
        schemaRegistry.register("customer-value", (ParsedSchema)parsedSchema.get());
    }

    private static Properties createConsumerProps(String brokerList, String schemaRegistryUrl) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", "avroGroup");
        props.put("session.timeout.ms", "6000");
        props.put("heartbeat.interval.ms", "2000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", WrapperKeyDeserializer.class);
        props.put("value.deserializer", KafkaAvroDeserializer.class);
        props.put("specific.avro.reader", (Object)true);
        props.put("wrapped.key.deserializer", StringDeserializer.class);
        props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
        return props;
    }

    private static Properties createDlqConsumerProps(String brokerList, String schemaRegistryUrl, Class<?> valueDeserializer) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", "avroGroup");
        props.put("session.timeout.ms", "6000");
        props.put("heartbeat.interval.ms", "2000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", WrapperKeyDeserializer.class);
        props.put("value.deserializer", valueDeserializer);
        props.put("specific.avro.reader", (Object)true);
        props.put("wrapped.key.deserializer", StringDeserializer.class);
        props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
        return props;
    }

    private static Consumer<String, Object> createConsumer(Properties props) {
        return new KafkaConsumer(props);
    }

    private static List<Map.Entry<String, Object>> consume(Consumer<String, Object> consumer, String topic, int numMessages) {
        ArrayList<Map.Entry<String, Object>> recordList = new ArrayList<Map.Entry<String, Object>>();
        consumer.subscribe(Arrays.asList(topic));
        int i = 0;
        do {
            ConsumerRecords records = consumer.poll(1000L);
            for (ConsumerRecord record : records) {
                recordList.add(new AbstractMap.SimpleEntry<String, Object>((String)record.key(), record.value()));
                ++i;
            }
        } while (i < numMessages);
        return recordList;
    }

    private static Properties createProducerProps(String brokerList, String schemaRegistryUrl) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
        props.put("key.serializer", WrapperKeySerializer.class);
        props.put("value.serializer", KafkaAvroSerializer.class);
        props.put("wrapped.key.serializer", StringSerializer.class);
        props.put("auto.register.schemas", "false");
        props.put("use.latest.version", "true");
        props.put("latest.compatibility.strict", "false");
        return props;
    }

    private static Producer createProducer(Properties props) {
        return new KafkaProducer(props);
    }

    private static void produce(Producer producer, String topic, String key, Object object) throws Exception {
        ProducerRecord record = new ProducerRecord(topic, (Object)key, object);
        producer.send(record).get();
    }

    private static Object createPayload(boolean isActive, String mode) {
        Customer value = new Customer();
        value.setSsn("123456789");
        value.setAddress("#10 abc, CA 94402");
        value.setMail("david@confluent.io");
        value.setUserId("uid_23434");
        value.setAge(45);
        value.setIBAN("GB33BUKB20201555555555");
        value.setActive(isActive);
        value.setBalance(new Float(10.0).floatValue());
        value.setMode(mode);
        return value;
    }

    @Test
    public void testAvroProducerFailWrite() throws Exception {
        CelExecutorIntegrationTest.registerSchema(this.restApp.restConnect);
        Object payload = CelExecutorIntegrationTest.createPayload(true, "fail_write");
        Properties producerProps = CelExecutorIntegrationTest.createProducerProps(this.brokerList, this.restApp.restConnect);
        try (Producer producer2 = CelExecutorIntegrationTest.createProducer(producerProps);){
            CelExecutorIntegrationTest.produce(producer2, TOPIC, "key", payload);
        }
        catch (Exception producer2) {
            // empty catch block
        }
        Properties consumerProps = CelExecutorIntegrationTest.createDlqConsumerProps(this.brokerList, this.restApp.restConnect, StringDeserializer.class);
        try (Consumer<String, Object> consumer = CelExecutorIntegrationTest.createConsumer(consumerProps);){
            List<Map.Entry<String, Object>> recordList = CelExecutorIntegrationTest.consume(consumer, DLQ_TOPIC2, 1);
            Map.Entry<String, Object> entry = recordList.get(0);
            String record = (String)entry.getValue();
            Customer avroRecord = (Customer)((Object)AvroSchemaUtils.toObject((String)record, (AvroSchema)new AvroSchema(Customer.SCHEMA$), (DatumReader)new SpecificDatumReader(Customer.SCHEMA$)));
            Assert.assertEquals((Object)"key", (Object)entry.getKey());
            Assert.assertEquals((Object)payload, (Object)((Object)avroRecord));
        }
    }

    @Test
    public void testAvroProducerFailRead() throws Exception {
        CelExecutorIntegrationTest.registerSchema(this.restApp.restConnect);
        Object payload = CelExecutorIntegrationTest.createPayload(true, "fail_read");
        Properties producerProps = CelExecutorIntegrationTest.createProducerProps(this.brokerList, this.restApp.restConnect);
        try (Producer producer2 = CelExecutorIntegrationTest.createProducer(producerProps);){
            CelExecutorIntegrationTest.produce(producer2, TOPIC, "key", payload);
        }
        catch (Exception producer2) {
            // empty catch block
        }
        Properties consumerProps = CelExecutorIntegrationTest.createConsumerProps(this.brokerList, this.restApp.restConnect);
        try (Consumer<String, Object> consumer = CelExecutorIntegrationTest.createConsumer(consumerProps);){
            try {
                CelExecutorIntegrationTest.consume(consumer, TOPIC, 0);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        Properties dlqConsumerProps = CelExecutorIntegrationTest.createDlqConsumerProps(this.brokerList, this.restApp.restConnect, KafkaAvroDeserializer.class);
        try (Consumer<String, Object> consumer = CelExecutorIntegrationTest.createConsumer(dlqConsumerProps);){
            List<Map.Entry<String, Object>> recordList = CelExecutorIntegrationTest.consume(consumer, DLQ_TOPIC3, 1);
            Map.Entry<String, Object> entry = recordList.get(0);
            Object record = entry.getValue();
            Assert.assertEquals((Object)"key", (Object)entry.getKey());
            Assert.assertEquals((Object)payload, (Object)record);
        }
    }

    @Test
    public void testAvroProducerSuccess() throws Exception {
        CelExecutorIntegrationTest.registerSchema(this.restApp.restConnect);
        Object payload = CelExecutorIntegrationTest.createPayload(true, "success");
        Properties producerProps = CelExecutorIntegrationTest.createProducerProps(this.brokerList, this.restApp.restConnect);
        try (Producer producer2 = CelExecutorIntegrationTest.createProducer(producerProps);){
            CelExecutorIntegrationTest.produce(producer2, TOPIC, "key", payload);
        }
        catch (Exception producer2) {
            // empty catch block
        }
        Properties consumerProps = CelExecutorIntegrationTest.createConsumerProps(this.brokerList, this.restApp.restConnect);
        try (Consumer<String, Object> consumer = CelExecutorIntegrationTest.createConsumer(consumerProps);){
            List<Map.Entry<String, Object>> recordList = CelExecutorIntegrationTest.consume(consumer, TOPIC, 1);
            Map.Entry<String, Object> entry = recordList.get(0);
            Object record = entry.getValue();
            Assert.assertEquals((Object)"key", (Object)entry.getKey());
            Assert.assertEquals((Object)payload, (Object)record);
        }
    }

    @Test
    public void testAvroProducerDoubleDlq() throws Exception {
        Customer avroRecord;
        String record;
        Map.Entry<String, Object> entry;
        List<Map.Entry<String, Object>> recordList;
        CelExecutorIntegrationTest.registerSchema(this.restApp.restConnect);
        Object payload1 = CelExecutorIntegrationTest.createPayload(false, "success");
        Object payload2 = CelExecutorIntegrationTest.createPayload(true, "fail_write");
        Properties producerProps = CelExecutorIntegrationTest.createProducerProps(this.brokerList, this.restApp.restConnect);
        try (Producer producer = CelExecutorIntegrationTest.createProducer(producerProps);){
            try {
                CelExecutorIntegrationTest.produce(producer, TOPIC, "key1", payload1);
            }
            catch (Exception exception) {
                // empty catch block
            }
            try {
                CelExecutorIntegrationTest.produce(producer, TOPIC, "key2", payload2);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        Properties consumerProps = CelExecutorIntegrationTest.createDlqConsumerProps(this.brokerList, this.restApp.restConnect, StringDeserializer.class);
        try (Consumer<String, Object> consumer = CelExecutorIntegrationTest.createConsumer(consumerProps);){
            recordList = CelExecutorIntegrationTest.consume(consumer, DLQ_TOPIC1, 1);
            entry = recordList.get(0);
            record = (String)entry.getValue();
            avroRecord = (Customer)((Object)AvroSchemaUtils.toObject((String)record, (AvroSchema)new AvroSchema(Customer.SCHEMA$), (DatumReader)new SpecificDatumReader(Customer.SCHEMA$)));
            Assert.assertEquals((Object)"key1", (Object)entry.getKey());
            Assert.assertEquals((Object)payload1, (Object)((Object)avroRecord));
        }
        consumer = CelExecutorIntegrationTest.createConsumer(consumerProps);
        try {
            recordList = CelExecutorIntegrationTest.consume(consumer, DLQ_TOPIC2, 1);
            entry = recordList.get(0);
            record = (String)entry.getValue();
            avroRecord = (Customer)((Object)AvroSchemaUtils.toObject((String)record, (AvroSchema)new AvroSchema(Customer.SCHEMA$), (DatumReader)new SpecificDatumReader(Customer.SCHEMA$)));
            Assert.assertEquals((Object)"key2", (Object)entry.getKey());
            Assert.assertEquals((Object)payload2, (Object)((Object)avroRecord));
        }
        finally {
            if (consumer != null) {
                consumer.close();
            }
        }
    }
}

