package io.confluent.kafka.schemaregistry.rules.cel;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rules.Customer;
import io.confluent.kafka.schemaregistry.rules.ResourceLoader;
import io.confluent.kafka.schemaregistry.storage.RuleSet;
import io.confluent.kafka.schemaregistry.storage.RuleSetHandler;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.WrapperKeyDeserializer;
import io.confluent.kafka.serializers.WrapperKeySerializer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/rules/cel/CelExecutorIntegrationTest.class */
public class CelExecutorIntegrationTest extends ClusterTestHarness {
    private static final Logger log = LoggerFactory.getLogger(CelExecutorIntegrationTest.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
    private static final String TOPIC = "customer";
    private static final String DLQ_TOPIC1 = "DLQ1";
    private static final String DLQ_TOPIC2 = "DLQ2";
    private static final String DLQ_TOPIC3 = "DLQ3";

    public CelExecutorIntegrationTest() {
        super(1, true);
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.restApp.schemaRegistry().setRuleSetHandler(new RuleSetHandler() { // from class: io.confluent.kafka.schemaregistry.rules.cel.CelExecutorIntegrationTest.1
            public void handle(String str, ConfigUpdateRequest configUpdateRequest) {
            }

            public void handle(String str, boolean z, RegisterSchemaRequest registerSchemaRequest) {
            }

            public RuleSet transform(io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet ruleSet) {
                if (ruleSet != null) {
                    return new RuleSet(ruleSet);
                }
                return null;
            }
        });
    }

    private static void registerSchema(String str) throws Exception {
        Schema schema = (Schema) mapper.readValue(new ResourceLoader("/").toString("test_cel.json"), Schema.class);
        CachedSchemaRegistryClient cachedSchemaRegistryClient = new CachedSchemaRegistryClient(new RestService(str), 10, ImmutableList.of(new AvroSchemaProvider(), new ProtobufSchemaProvider(), new JsonSchemaProvider()), Collections.emptyMap(), Collections.emptyMap());
        cachedSchemaRegistryClient.register("customer-value", (ParsedSchema) cachedSchemaRegistryClient.parseSchema(schema).get());
    }

    private static Properties createConsumerProps(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", "avroGroup");
        properties.put("session.timeout.ms", "6000");
        properties.put("heartbeat.interval.ms", "2000");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", WrapperKeyDeserializer.class);
        properties.put("value.deserializer", KafkaAvroDeserializer.class);
        properties.put("specific.avro.reader", true);
        properties.put("wrapped.key.deserializer", StringDeserializer.class);
        properties.put(SCHEMA_REGISTRY_URL, str2);
        return properties;
    }

    private static Properties createDlqConsumerProps(String str, String str2, Class<?> cls) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("group.id", "avroGroup");
        properties.put("session.timeout.ms", "6000");
        properties.put("heartbeat.interval.ms", "2000");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("key.deserializer", WrapperKeyDeserializer.class);
        properties.put("value.deserializer", cls);
        properties.put("specific.avro.reader", true);
        properties.put("wrapped.key.deserializer", StringDeserializer.class);
        properties.put(SCHEMA_REGISTRY_URL, str2);
        return properties;
    }

    private static Consumer<String, Object> createConsumer(Properties properties) {
        return new KafkaConsumer(properties);
    }

    private static List<Map.Entry<String, Object>> consume(Consumer<String, Object> consumer, String str, int i) {
        ArrayList arrayList = new ArrayList();
        consumer.subscribe(Arrays.asList(str));
        int i2 = 0;
        do {
            Iterator it = consumer.poll(1000L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                arrayList.add(new AbstractMap.SimpleEntry((String) consumerRecord.key(), consumerRecord.value()));
                i2++;
            }
        } while (i2 < i);
        return arrayList;
    }

    private static Properties createProducerProps(String str, String str2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put(SCHEMA_REGISTRY_URL, str2);
        properties.put("key.serializer", WrapperKeySerializer.class);
        properties.put("value.serializer", KafkaAvroSerializer.class);
        properties.put("wrapped.key.serializer", StringSerializer.class);
        properties.put("auto.register.schemas", "false");
        properties.put("use.latest.version", "true");
        properties.put("latest.compatibility.strict", "false");
        return properties;
    }

    private static Producer createProducer(Properties properties) {
        return new KafkaProducer(properties);
    }

    private static void produce(Producer producer, String str, String str2, Object obj) throws Exception {
        producer.send(new ProducerRecord(str, str2, obj)).get();
    }

    private static Object createPayload(boolean z, String str) {
        Customer customer = new Customer();
        customer.setSsn("123456789");
        customer.setAddress("#10 abc, CA 94402");
        customer.setMail("david@confluent.io");
        customer.setUserId("uid_23434");
        customer.setAge(45);
        customer.setIBAN("GB33BUKB20201555555555");
        customer.setActive(z);
        customer.setBalance(new Float(10.0d).floatValue());
        customer.setMode(str);
        return customer;
    }

    @Test
    public void testAvroProducerFailWrite() throws Exception {
        registerSchema(this.restApp.restConnect);
        Object createPayload = createPayload(true, "fail_write");
        try {
            Producer createProducer = createProducer(createProducerProps(this.brokerList, this.restApp.restConnect));
            try {
                produce(createProducer, TOPIC, "key", createPayload);
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Exception e) {
        }
        Consumer<String, Object> createConsumer = createConsumer(createDlqConsumerProps(this.brokerList, this.restApp.restConnect, StringDeserializer.class));
        try {
            Map.Entry<String, Object> entry = consume(createConsumer, DLQ_TOPIC2, 1).get(0);
            Customer customer = (Customer) AvroSchemaUtils.toObject((String) entry.getValue(), new AvroSchema(Customer.SCHEMA$), new SpecificDatumReader(Customer.SCHEMA$));
            Assert.assertEquals("key", entry.getKey());
            Assert.assertEquals(createPayload, customer);
            if (createConsumer != null) {
                createConsumer.close();
            }
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAvroProducerFailRead() throws Exception {
        registerSchema(this.restApp.restConnect);
        Object createPayload = createPayload(true, "fail_read");
        try {
            Producer createProducer = createProducer(createProducerProps(this.brokerList, this.restApp.restConnect));
            try {
                produce(createProducer, TOPIC, "key", createPayload);
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Exception e) {
        }
        Consumer<String, Object> createConsumer = createConsumer(createConsumerProps(this.brokerList, this.restApp.restConnect));
        try {
            consume(createConsumer, TOPIC, 0);
        } catch (Exception e2) {
        } catch (Throwable th) {
            throw th;
        }
        if (createConsumer != null) {
            createConsumer.close();
        }
        createConsumer = createConsumer(createDlqConsumerProps(this.brokerList, this.restApp.restConnect, KafkaAvroDeserializer.class));
        try {
            Map.Entry<String, Object> entry = consume(createConsumer, DLQ_TOPIC3, 1).get(0);
            Object value = entry.getValue();
            Assert.assertEquals("key", entry.getKey());
            Assert.assertEquals(createPayload, value);
            if (createConsumer != null) {
                createConsumer.close();
            }
        } finally {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        }
    }

    @Test
    public void testAvroProducerSuccess() throws Exception {
        registerSchema(this.restApp.restConnect);
        Object createPayload = createPayload(true, "success");
        try {
            Producer createProducer = createProducer(createProducerProps(this.brokerList, this.restApp.restConnect));
            try {
                produce(createProducer, TOPIC, "key", createPayload);
                if (createProducer != null) {
                    createProducer.close();
                }
            } finally {
            }
        } catch (Exception e) {
        }
        Consumer<String, Object> createConsumer = createConsumer(createConsumerProps(this.brokerList, this.restApp.restConnect));
        try {
            Map.Entry<String, Object> entry = consume(createConsumer, TOPIC, 1).get(0);
            Object value = entry.getValue();
            Assert.assertEquals("key", entry.getKey());
            Assert.assertEquals(createPayload, value);
            if (createConsumer != null) {
                createConsumer.close();
            }
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAvroProducerDoubleDlq() throws Exception {
        registerSchema(this.restApp.restConnect);
        Object createPayload = createPayload(false, "success");
        Object createPayload2 = createPayload(true, "fail_write");
        Producer createProducer = createProducer(createProducerProps(this.brokerList, this.restApp.restConnect));
        try {
            try {
                produce(createProducer, TOPIC, "key1", createPayload);
            } catch (Throwable th) {
                if (createProducer != null) {
                    try {
                        createProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Exception e) {
        }
        try {
            produce(createProducer, TOPIC, "key2", createPayload2);
        } catch (Exception e2) {
        }
        if (createProducer != null) {
            createProducer.close();
        }
        Properties createDlqConsumerProps = createDlqConsumerProps(this.brokerList, this.restApp.restConnect, StringDeserializer.class);
        Consumer<String, Object> createConsumer = createConsumer(createDlqConsumerProps);
        try {
            Map.Entry<String, Object> entry = consume(createConsumer, DLQ_TOPIC1, 1).get(0);
            Customer customer = (Customer) AvroSchemaUtils.toObject((String) entry.getValue(), new AvroSchema(Customer.SCHEMA$), new SpecificDatumReader(Customer.SCHEMA$));
            Assert.assertEquals("key1", entry.getKey());
            Assert.assertEquals(createPayload, customer);
            if (createConsumer != null) {
                createConsumer.close();
            }
            createConsumer = createConsumer(createDlqConsumerProps);
            try {
                Map.Entry<String, Object> entry2 = consume(createConsumer, DLQ_TOPIC2, 1).get(0);
                Customer customer2 = (Customer) AvroSchemaUtils.toObject((String) entry2.getValue(), new AvroSchema(Customer.SCHEMA$), new SpecificDatumReader(Customer.SCHEMA$));
                Assert.assertEquals("key2", entry2.getKey());
                Assert.assertEquals(createPayload2, customer2);
                if (createConsumer != null) {
                    createConsumer.close();
                }
            } finally {
            }
        } finally {
        }
    }
}
