/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.rules.jsonata;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSortedMap;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Config;
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata;
import io.confluent.kafka.schemaregistry.client.rest.entities.Rule;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleKind;
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleMode;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.json.JsonSchemaUtils;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rules.NewSpecificWidget;
import io.confluent.kafka.schemaregistry.rules.NewSpecificWidgetProto;
import io.confluent.kafka.schemaregistry.rules.NewerSpecificWidget;
import io.confluent.kafka.schemaregistry.rules.NewerSpecificWidgetProto;
import io.confluent.kafka.schemaregistry.rules.SpecificWidget;
import io.confluent.kafka.schemaregistry.rules.SpecificWidgetProto;
import io.confluent.kafka.schemaregistry.rules.jsonata.JsonataExecutorTest;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.RuleSet;
import io.confluent.kafka.schemaregistry.storage.RuleSetHandler;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JsonataExecutorIntegrationTest
extends ClusterTestHarness {
    private static final Logger log = LoggerFactory.getLogger(JsonataExecutorIntegrationTest.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
    private static final String TOPIC = "widget";
    private static final UUID ID = UUID.fromString("2182b6f9-6422-43d8-819e-822b2b678eec");

    public JsonataExecutorIntegrationTest() {
        super(1, true);
    }

    @Before
    public void setUp() throws Exception {
        super.setUp();
        ((KafkaSchemaRegistry)this.restApp.schemaRegistry()).setRuleSetHandler(new RuleSetHandler(){

            public void handle(String subject, ConfigUpdateRequest request) {
            }

            public void handle(String subject, boolean normalize, RegisterSchemaRequest request) {
            }

            public RuleSet transform(io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet ruleSet) {
                return ruleSet != null ? new RuleSet(ruleSet) : null;
            }
        });
    }

    private static Properties createConsumerProps(String brokerList, String schemaRegistryUrl, String applicationVersion, Map<String, Object> additionalProps) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", "avroGroup" + applicationVersion);
        props.put("session.timeout.ms", "6000");
        props.put("heartbeat.interval.ms", "2000");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class);
        props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
        props.put("use.latest.with.metadata", "application.version=" + applicationVersion);
        props.put("avro.use.logical.type.converters", "true");
        props.putAll(additionalProps);
        return props;
    }

    private static Consumer<String, Object> createConsumer(Properties props) {
        return new KafkaConsumer(props);
    }

    private static List<Map.Entry<String, Object>> consume(Consumer<String, Object> consumer, String topic, int numMessages) {
        ArrayList<Map.Entry<String, Object>> recordList = new ArrayList<Map.Entry<String, Object>>();
        consumer.subscribe(Arrays.asList(topic));
        int i = 0;
        do {
            ConsumerRecords records = consumer.poll(1000L);
            for (ConsumerRecord record : records) {
                recordList.add(new AbstractMap.SimpleEntry<String, Object>((String)record.key(), record.value()));
                ++i;
            }
        } while (i < numMessages);
        return recordList;
    }

    private static Properties createProducerProps(String brokerList, String schemaRegistryUrl, String applicationVersion, Map<String, Object> additionalProps) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put(SCHEMA_REGISTRY_URL, schemaRegistryUrl);
        props.put("key.serializer", StringSerializer.class);
        props.put("auto.register.schemas", "false");
        props.put("use.latest.with.metadata", "application.version=" + applicationVersion);
        props.put("avro.use.logical.type.converters", "true");
        props.put("latest.compatibility.strict", "false");
        props.putAll(additionalProps);
        return props;
    }

    private static Producer createProducer(Properties props) {
        return new KafkaProducer(props);
    }

    private static void produce(Producer producer, String topic, String key, Object object) throws Exception {
        ProducerRecord record = new ProducerRecord(topic, (Object)key, object);
        producer.send(record).get();
    }

    @Test
    public void testAvroReflectionFullyCompatible() throws Exception {
        JsonataExecutorIntegrationTest.registerReflectSchemas(this.restApp.restConnect);
        HashMap<String, Object> additionalProps = new HashMap<String, Object>();
        additionalProps.put("value.serializer", KafkaAvroSerializer.class);
        additionalProps.put("value.deserializer", KafkaAvroDeserializer.class);
        additionalProps.put("schema.reflection", "true");
        ArrayList<Object> payloads = new ArrayList<Object>();
        JsonataExecutorTest.OldWidget widget = new JsonataExecutorTest.OldWidget(ID, "alice");
        widget.setSize(123);
        payloads.add(widget);
        JsonataExecutorTest.NewWidget newWidget = new JsonataExecutorTest.NewWidget(ID, "alice");
        newWidget.setHeight(123);
        payloads.add(newWidget);
        JsonataExecutorTest.NewerWidget newerWidget = new JsonataExecutorTest.NewerWidget(ID, "alice");
        newerWidget.setLength(123);
        payloads.add(newerWidget);
        this.produceAndConsume(additionalProps, payloads);
    }

    private static void registerReflectSchemas(String schemaRegistryUrl) throws Exception {
        String rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])";
        String rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])";
        String rule2To3 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'length': $.'height'}])";
        String rule3To2 = "$merge([$sift($, function($v, $k) {$k != 'length'}), {'height': $.'length'}])";
        CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new RestService(schemaRegistryUrl), 10, (List)ImmutableList.of((Object)new AvroSchemaProvider(), (Object)new ProtobufSchemaProvider(), (Object)new JsonSchemaProvider()), Collections.emptyMap(), Collections.emptyMap());
        Config config = new Config();
        config.setCompatibilityLevel("NONE");
        schemaRegistry.updateConfig("widget-value", config);
        Schema schema = AvroSchemaUtils.getReflectData().getSchema(JsonataExecutorTest.OldWidget.class);
        AvroSchema avroSchema = new AvroSchema(schema);
        ImmutableSortedMap props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v1");
        Metadata metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        avroSchema = avroSchema.copy(metadata, null);
        schemaRegistry.register("widget-value", (ParsedSchema)avroSchema);
        schema = AvroSchemaUtils.getReflectData().getSchema(JsonataExecutorTest.NewWidget.class);
        avroSchema = new AvroSchema(schema);
        Rule rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule1To2, null, null, false);
        Rule rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule2To1, null, null, false);
        io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v2");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        avroSchema = avroSchema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)avroSchema);
        schema = AvroSchemaUtils.getReflectData().getSchema(JsonataExecutorTest.NewerWidget.class);
        avroSchema = new AvroSchema(schema);
        rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule2To3, null, null, false);
        rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule3To2, null, null, false);
        ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v3");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        avroSchema = avroSchema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)avroSchema);
    }

    @Test
    public void testAvroGenericFullyCompatible() throws Exception {
        JsonataExecutorIntegrationTest.registerSpecificSchemas(this.restApp.restConnect);
        HashMap<String, Object> additionalProps = new HashMap<String, Object>();
        additionalProps.put("value.serializer", KafkaAvroSerializer.class);
        additionalProps.put("value.deserializer", KafkaAvroDeserializer.class);
        ArrayList<Object> payloads = new ArrayList<Object>();
        GenericData.Record avroRecord = new GenericData.Record(SpecificWidget.SCHEMA$);
        avroRecord.put("name", (Object)"alice");
        avroRecord.put("size", (Object)123);
        avroRecord.put("version", (Object)0);
        payloads.add(avroRecord);
        avroRecord = new GenericData.Record(NewSpecificWidget.SCHEMA$);
        avroRecord.put("name", (Object)"alice");
        avroRecord.put("height", (Object)123);
        avroRecord.put("version", (Object)0);
        payloads.add(avroRecord);
        avroRecord = new GenericData.Record(NewerSpecificWidget.SCHEMA$);
        avroRecord.put("name", (Object)"alice");
        avroRecord.put("length", (Object)123);
        avroRecord.put("version", (Object)0);
        payloads.add(avroRecord);
        this.produceAndConsume(additionalProps, payloads);
    }

    @Test
    public void testAvroSpecificFullyCompatible() throws Exception {
        JsonataExecutorIntegrationTest.registerSpecificSchemas(this.restApp.restConnect);
        HashMap<String, Object> additionalProps = new HashMap<String, Object>();
        additionalProps.put("value.serializer", KafkaAvroSerializer.class);
        additionalProps.put("value.deserializer", KafkaAvroDeserializer.class);
        additionalProps.put("specific.avro.reader", "true");
        ArrayList<Object> payloads = new ArrayList<Object>();
        SpecificWidget widget = new SpecificWidget();
        widget.setName("alice");
        widget.setSize(123);
        payloads.add((Object)widget);
        NewSpecificWidget newWidget = new NewSpecificWidget();
        newWidget.setName("alice");
        newWidget.setHeight(123);
        payloads.add((Object)newWidget);
        NewerSpecificWidget newerWidget = new NewerSpecificWidget();
        newerWidget.setName("alice");
        newerWidget.setLength(123);
        payloads.add((Object)newerWidget);
        this.produceAndConsume(additionalProps, payloads);
    }

    private static void registerSpecificSchemas(String schemaRegistryUrl) throws Exception {
        String rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])";
        String rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])";
        String rule2To3 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'length': $.'height'}])";
        String rule3To2 = "$merge([$sift($, function($v, $k) {$k != 'length'}), {'height': $.'length'}])";
        CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new RestService(schemaRegistryUrl), 10, (List)ImmutableList.of((Object)new AvroSchemaProvider(), (Object)new ProtobufSchemaProvider(), (Object)new JsonSchemaProvider()), Collections.emptyMap(), Collections.emptyMap());
        Config config = new Config();
        config.setCompatibilityLevel("NONE");
        schemaRegistry.updateConfig("widget-value", config);
        Schema schema = SpecificWidget.SCHEMA$;
        AvroSchema avroSchema = new AvroSchema(schema);
        ImmutableSortedMap props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v1");
        Metadata metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        avroSchema = avroSchema.copy(metadata, null);
        schemaRegistry.register("widget-value", (ParsedSchema)avroSchema);
        schema = NewSpecificWidget.SCHEMA$;
        avroSchema = new AvroSchema(schema);
        Rule rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule1To2, null, null, false);
        Rule rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule2To1, null, null, false);
        io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v2");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        avroSchema = avroSchema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)avroSchema);
        schema = NewerSpecificWidget.SCHEMA$;
        avroSchema = new AvroSchema(schema);
        rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule2To3, null, null, false);
        rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule3To2, null, null, false);
        ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v3");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        avroSchema = avroSchema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)avroSchema);
    }

    @Test
    public void testProtobufSpecificFullyCompatible() throws Exception {
        JsonataExecutorIntegrationTest.registerProtobufSchemas(this.restApp.restConnect);
        HashMap<String, Object> additionalProps = new HashMap<String, Object>();
        additionalProps.put("value.serializer", KafkaProtobufSerializer.class);
        additionalProps.put("value.deserializer", KafkaProtobufDeserializer.class);
        additionalProps.put("derive.type", "true");
        ArrayList<Object> payloads = new ArrayList<Object>();
        SpecificWidgetProto.SpecificWidget.Builder widget = SpecificWidgetProto.SpecificWidget.newBuilder();
        widget.setName("alice");
        widget.setSize(123);
        payloads.add(widget.build());
        NewSpecificWidgetProto.NewSpecificWidget.Builder newWidget = NewSpecificWidgetProto.NewSpecificWidget.newBuilder();
        newWidget.setName("alice");
        newWidget.setHeight(123);
        payloads.add(newWidget.build());
        NewerSpecificWidgetProto.NewerSpecificWidget.Builder newerWidget = NewerSpecificWidgetProto.NewerSpecificWidget.newBuilder();
        newerWidget.setName("alice");
        newerWidget.setLength(123);
        payloads.add(newerWidget.build());
        this.produceAndConsume(additionalProps, payloads);
    }

    @Test
    public void testProtobufGenericFullyCompatible() throws Exception {
        JsonataExecutorIntegrationTest.registerProtobufSchemas(this.restApp.restConnect);
        HashMap<String, Object> additionalProps = new HashMap<String, Object>();
        additionalProps.put("value.serializer", KafkaProtobufSerializer.class);
        additionalProps.put("value.deserializer", KafkaProtobufDeserializer.class);
        ArrayList<Object> payloads = new ArrayList<Object>();
        ProtobufSchema schema = new ProtobufSchema(SpecificWidgetProto.SpecificWidget.getDescriptor());
        DynamicMessage.Builder builder = schema.newMessageBuilder();
        builder.setField(builder.getDescriptorForType().findFieldByName("name"), (Object)"alice");
        builder.setField(builder.getDescriptorForType().findFieldByName("size"), (Object)123);
        payloads.add(builder.build());
        schema = new ProtobufSchema(NewSpecificWidgetProto.NewSpecificWidget.getDescriptor());
        builder = schema.newMessageBuilder();
        builder.setField(builder.getDescriptorForType().findFieldByName("name"), (Object)"alice");
        builder.setField(builder.getDescriptorForType().findFieldByName("height"), (Object)123);
        payloads.add(builder.build());
        schema = new ProtobufSchema(NewerSpecificWidgetProto.NewerSpecificWidget.getDescriptor());
        builder = schema.newMessageBuilder();
        builder.setField(builder.getDescriptorForType().findFieldByName("name"), (Object)"alice");
        builder.setField(builder.getDescriptorForType().findFieldByName("length"), (Object)123);
        payloads.add(builder.build());
        this.produceAndConsume(additionalProps, payloads);
    }

    private static void registerProtobufSchemas(String schemaRegistryUrl) throws Exception {
        String rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])";
        String rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])";
        String rule2To3 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'length': $.'height'}])";
        String rule3To2 = "$merge([$sift($, function($v, $k) {$k != 'length'}), {'height': $.'length'}])";
        CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new RestService(schemaRegistryUrl), 10, (List)ImmutableList.of((Object)new AvroSchemaProvider(), (Object)new ProtobufSchemaProvider(), (Object)new JsonSchemaProvider()), Collections.emptyMap(), Collections.emptyMap());
        Config config = new Config();
        config.setCompatibilityLevel("NONE");
        schemaRegistry.updateConfig("widget-value", config);
        ProtobufSchema schema = new ProtobufSchema(SpecificWidgetProto.SpecificWidget.getDescriptor());
        ImmutableSortedMap props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v1");
        Metadata metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        schema = schema.copy(metadata, null);
        schemaRegistry.register("widget-value", (ParsedSchema)schema);
        schema = new ProtobufSchema(NewSpecificWidgetProto.NewSpecificWidget.getDescriptor());
        Rule rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule1To2, null, null, false);
        Rule rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule2To1, null, null, false);
        io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v2");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        schema = schema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)schema);
        schema = new ProtobufSchema(NewerSpecificWidgetProto.NewerSpecificWidget.getDescriptor());
        rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule2To3, null, null, false);
        rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule3To2, null, null, false);
        ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v3");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        schema = schema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)schema);
    }

    @Test
    public void testJsonSchemaPojoFullyCompatible() throws Exception {
        JsonataExecutorIntegrationTest.registerJsonSchemas(this.restApp.restConnect);
        HashMap<String, Object> additionalProps = new HashMap<String, Object>();
        additionalProps.put("value.serializer", KafkaJsonSchemaSerializer.class);
        additionalProps.put("value.deserializer", KafkaJsonSchemaDeserializer.class);
        ArrayList<Object> payloads = new ArrayList<Object>();
        JsonataExecutorTest.OldWidget widget = new JsonataExecutorTest.OldWidget(ID, "alice");
        widget.setSize(123);
        payloads.add(widget);
        JsonataExecutorTest.NewWidget newWidget = new JsonataExecutorTest.NewWidget(ID, "alice");
        newWidget.setHeight(123);
        payloads.add(newWidget);
        JsonataExecutorTest.NewerWidget newerWidget = new JsonataExecutorTest.NewerWidget(ID, "alice");
        newerWidget.setLength(123);
        payloads.add(newerWidget);
        this.produceAndConsume(additionalProps, payloads);
    }

    @Test
    public void testJsonSchemaJsonNodeFullyCompatible() throws Exception {
        JsonataExecutorIntegrationTest.registerJsonSchemas(this.restApp.restConnect);
        HashMap<String, Object> additionalProps = new HashMap<String, Object>();
        additionalProps.put("value.serializer", KafkaJsonSchemaSerializer.class);
        additionalProps.put("value.deserializer", KafkaJsonSchemaDeserializer.class);
        additionalProps.put("json.value.type", JsonNode.class);
        ArrayList<Object> payloads = new ArrayList<Object>();
        JsonataExecutorTest.OldWidget widget = new JsonataExecutorTest.OldWidget(ID, "alice");
        widget.setSize(123);
        payloads.add(mapper.valueToTree((Object)widget));
        JsonataExecutorTest.NewWidget newWidget = new JsonataExecutorTest.NewWidget(ID, "alice");
        newWidget.setHeight(123);
        payloads.add(mapper.valueToTree((Object)newWidget));
        JsonataExecutorTest.NewerWidget newerWidget = new JsonataExecutorTest.NewerWidget(ID, "alice");
        newerWidget.setLength(123);
        payloads.add(mapper.valueToTree((Object)newerWidget));
        this.produceAndConsume(additionalProps, payloads);
    }

    private static void registerJsonSchemas(String schemaRegistryUrl) throws Exception {
        String rule1To2 = "$merge([$sift($, function($v, $k) {$k != 'size'}), {'height': $.'size'}])";
        String rule2To1 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'size': $.'height'}])";
        String rule2To3 = "$merge([$sift($, function($v, $k) {$k != 'height'}), {'length': $.'height'}])";
        String rule3To2 = "$merge([$sift($, function($v, $k) {$k != 'length'}), {'height': $.'length'}])";
        CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(new RestService(schemaRegistryUrl), 10, (List)ImmutableList.of((Object)new AvroSchemaProvider(), (Object)new ProtobufSchemaProvider(), (Object)new JsonSchemaProvider()), Collections.emptyMap(), Collections.emptyMap());
        Config config = new Config();
        config.setCompatibilityLevel("NONE");
        schemaRegistry.updateConfig("widget-value", config);
        JsonSchema schema = JsonSchemaUtils.getSchema((Object)new JsonataExecutorTest.OldWidget());
        ImmutableSortedMap props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v1");
        Metadata metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        schema = schema.copy(metadata, null);
        schemaRegistry.register("widget-value", (ParsedSchema)schema);
        schema = JsonSchemaUtils.getSchema((Object)new JsonataExecutorTest.NewWidget());
        Rule rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule1To2, null, null, false);
        Rule rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule2To1, null, null, false);
        io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v2");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        schema = schema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)schema);
        schema = JsonSchemaUtils.getSchema((Object)new JsonataExecutorTest.NewerWidget());
        rule = new Rule("myRule1", null, RuleKind.TRANSFORM, RuleMode.UPGRADE, "JSONATA", null, null, rule2To3, null, null, false);
        rule2 = new Rule("myRule2", null, RuleKind.TRANSFORM, RuleMode.DOWNGRADE, "JSONATA", null, null, rule3To2, null, null, false);
        ruleSet = new io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet((List)ImmutableList.of((Object)rule, (Object)rule2), Collections.emptyList());
        props = ImmutableSortedMap.of((Comparable)((Object)"application.version"), (Object)"v3");
        metadata = new Metadata(Collections.emptySortedMap(), (Map)props, Collections.emptySortedSet());
        schema = schema.copy(metadata, ruleSet);
        schemaRegistry.register("widget-value", (ParsedSchema)schema);
    }

    private void produceAndConsume(Map<String, Object> additionalProps, List<Object> payloads) throws Exception {
        Map.Entry<String, Object> entry;
        List<Map.Entry<String, Object>> recordList;
        Properties producerProps = JsonataExecutorIntegrationTest.createProducerProps(this.brokerList, this.restApp.restConnect, "v1", additionalProps);
        try (Producer producer = JsonataExecutorIntegrationTest.createProducer(producerProps);){
            JsonataExecutorIntegrationTest.produce(producer, TOPIC, "key1", payloads.get(0));
        }
        producerProps = JsonataExecutorIntegrationTest.createProducerProps(this.brokerList, this.restApp.restConnect, "v2", additionalProps);
        producer = JsonataExecutorIntegrationTest.createProducer(producerProps);
        try {
            JsonataExecutorIntegrationTest.produce(producer, TOPIC, "key2", payloads.get(1));
        }
        finally {
            if (producer != null) {
                producer.close();
            }
        }
        producerProps = JsonataExecutorIntegrationTest.createProducerProps(this.brokerList, this.restApp.restConnect, "v3", additionalProps);
        producer = JsonataExecutorIntegrationTest.createProducer(producerProps);
        try {
            JsonataExecutorIntegrationTest.produce(producer, TOPIC, "key3", payloads.get(2));
        }
        finally {
            if (producer != null) {
                producer.close();
            }
        }
        Properties consumerProps = JsonataExecutorIntegrationTest.createConsumerProps(this.brokerList, this.restApp.restConnect, "v1", additionalProps);
        try (Consumer<String, Object> consumer = JsonataExecutorIntegrationTest.createConsumer(consumerProps);){
            recordList = JsonataExecutorIntegrationTest.consume(consumer, TOPIC, 3);
            entry = recordList.get(0);
            this.assertMessagesEqual(entry.getValue(), payloads.get(0));
        }
        consumerProps = JsonataExecutorIntegrationTest.createConsumerProps(this.brokerList, this.restApp.restConnect, "v2", additionalProps);
        consumer = JsonataExecutorIntegrationTest.createConsumer(consumerProps);
        try {
            recordList = JsonataExecutorIntegrationTest.consume(consumer, TOPIC, 3);
            entry = recordList.get(0);
            this.assertMessagesEqual(entry.getValue(), payloads.get(1));
        }
        finally {
            if (consumer != null) {
                consumer.close();
            }
        }
        consumerProps = JsonataExecutorIntegrationTest.createConsumerProps(this.brokerList, this.restApp.restConnect, "v3", additionalProps);
        consumer = JsonataExecutorIntegrationTest.createConsumer(consumerProps);
        try {
            recordList = JsonataExecutorIntegrationTest.consume(consumer, TOPIC, 3);
            entry = recordList.get(0);
            this.assertMessagesEqual(entry.getValue(), payloads.get(2));
        }
        finally {
            if (consumer != null) {
                consumer.close();
            }
        }
    }

    private void assertMessagesEqual(Object o1, Object o2) {
        if (o1 instanceof DynamicMessage) {
            Assert.assertTrue((boolean)(o2 instanceof DynamicMessage));
            DynamicMessage d2 = (DynamicMessage)o2;
            for (Map.Entry entry : ((DynamicMessage)o1).getAllFields().entrySet()) {
                Descriptors.FieldDescriptor f1 = (Descriptors.FieldDescriptor)entry.getKey();
                Assert.assertEquals(entry.getValue(), (Object)d2.getField(d2.getDescriptorForType().findFieldByName(f1.getName())));
            }
        } else {
            Assert.assertEquals((Object)o1, (Object)o2);
        }
    }
}

