/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.formatter;

import io.confluent.kafka.formatter.AvroMessageFormatter;
import io.confluent.kafka.formatter.AvroMessageReader;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KafkaAvroFormatterTest {
    private static final String RECORD_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}";
    private static final String RECORD_KEY_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"keyRecord\",\"fields\": [{\"name\": \"key_field\", \"type\": \"string\"}]}";
    private static final String RECORD_VALUE_SCHEMA_STRING = "{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"valueRecord\",\"fields\": [{\"name\": \"value_field\", \"type\": \"string\"}]}";
    private Properties props;
    private AvroMessageFormatter formatter;
    private Schema recordSchema = null;
    private Schema intSchema = null;
    private String url = "mock://test";
    private SchemaRegistryClient schemaRegistry = null;

    @Before
    public void setUp() {
        this.props = new Properties();
        this.props.put("schema.registry.url", this.url);
        Schema.Parser parser = new Schema.Parser();
        this.recordSchema = parser.parse(RECORD_SCHEMA_STRING);
        this.intSchema = parser.parse("{\"type\" : \"int\"}");
        this.schemaRegistry = MockSchemaRegistry.getClientForScope((String)"test");
        this.formatter = new AvroMessageFormatter(this.url, null);
    }

    @After
    public void tearDown() {
        MockSchemaRegistry.dropScope((String)"test");
    }

    @Test
    public void testKafkaAvroValueFormatter() {
        this.formatter.init(this.props);
        String inputJson = "{\"name\":\"myname\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        AvroMessageReader avroReader = new AvroMessageReader(this.url, null, this.recordSchema, "topic1", false, reader, false, true, false);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, serializedValue.length, null, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input value json should match output value json", (Object)inputJson, (Object)outputJson);
    }

    @Test
    public void testKafkaAvroKeyValueFormatter() {
        this.props.put("print.key", "true");
        this.formatter.init(this.props);
        String inputJson = "10\t{\"name\":\"myname\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        AvroMessageReader avroReader = new AvroMessageReader(this.url, this.intSchema, this.recordSchema, "topic1", true, reader, false, true, false);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedKey = (byte[])message.key();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, serializedKey.length, serializedValue.length, (Object)serializedKey, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input key/value json should match output key/value json", (Object)inputJson, (Object)outputJson);
    }

    @Test
    public void testKafkaAvroValueWithTimestampFormatter() {
        this.props.put("print.timestamp", "true");
        this.formatter.init(this.props);
        long timestamp = 1000L;
        TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
        String inputJson = "{\"name\":\"myname\"}\n";
        String expectedJson = String.format("%s:%d\t%s", timestampType.name, timestamp, inputJson);
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        AvroMessageReader avroReader = new AvroMessageReader(this.url, null, this.recordSchema, "topic1", false, reader, false, true, false);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, timestamp, timestampType, 0L, 0, serializedValue.length, null, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input value json should match output value json", (Object)expectedJson, (Object)outputJson);
    }

    @Test
    public void testInvalidFormat() {
        String inputJson = "{\"invalid-field-name\":\"myname\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        AvroMessageReader avroReader = new AvroMessageReader(this.url, null, this.recordSchema, "topic1", false, reader, false, true, false);
        try {
            avroReader.readMessage();
            Assert.fail((String)"Registering an invalid schema should fail");
        }
        catch (SerializationException e) {
            Assert.assertTrue((String)"The cause of the exception should be avro", (boolean)(e.getCause() instanceof AvroRuntimeException));
        }
    }

    @Test
    public void testStringKey() {
        this.props.put("print.key", "true");
        this.formatter = new AvroMessageFormatter(this.url, (Deserializer)new StringDeserializer());
        this.formatter.init(this.props);
        String inputJson = "{\"name\":\"myname\"}\n";
        String expectedJson = "TestKey\t" + inputJson;
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        AvroMessageReader avroReader = new AvroMessageReader(this.url, null, this.recordSchema, "topic1", false, reader, false, true, false);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedKey = "TestKey".getBytes();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, serializedKey.length, serializedValue.length, (Object)serializedKey, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input key/value json should match output key/value json", (Object)expectedJson, (Object)outputJson);
    }

    @Test
    public void testStringKeyWithTimestamp() {
        this.props.put("print.key", "true");
        this.props.put("print.timestamp", "true");
        this.formatter = new AvroMessageFormatter(this.url, (Deserializer)new StringDeserializer());
        this.formatter.init(this.props);
        long timestamp = 1000L;
        TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
        String inputJson = "{\"name\":\"myname\"}\n";
        String expectedJson = String.format("%s:%d\tTestKey\t%s", timestampType.name, timestamp, inputJson);
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        AvroMessageReader avroReader = new AvroMessageReader(this.url, null, this.recordSchema, "topic1", false, reader, false, true, false);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedKey = "TestKey".getBytes();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, timestamp, timestampType, 0L, serializedKey.length, serializedValue.length, (Object)serializedKey, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input key/value json should match output key/value json", (Object)expectedJson, (Object)outputJson);
    }

    @Test
    public void testKafkaAvroValueUsingLatestVersion() throws Exception {
        this.formatter.init(this.props);
        this.schemaRegistry.register("topic1-value", (ParsedSchema)new AvroSchema(this.recordSchema));
        String inputJson = "{\"name\":\"myname\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        AvroMessageReader avroReader = new AvroMessageReader(this.url, null, this.recordSchema, "topic1", false, reader, false, false, true);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, serializedValue.length, null, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input value json should match output value json", (Object)inputJson, (Object)outputJson);
    }

    @Test
    public void testUsingTopicRecordNameStrategy() throws Exception {
        ProducerRecord message;
        HashMap<String, String> propertyMap = new HashMap<String, String>();
        String topicName = "mytopic";
        propertyMap.put("topic", "mytopic");
        propertyMap.put("schema.registry.url", "mock://foo");
        propertyMap.put("value.subject.name.strategy", TopicRecordNameStrategy.class.getName());
        propertyMap.put("auto.register.schemas", "false");
        propertyMap.put("value.schema", RECORD_SCHEMA_STRING);
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(propertyMap);
        SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry.getClientForScope((String)"foo");
        schemaRegistryClient.register("mytopic-value", (ParsedSchema)new AvroSchema(this.recordSchema));
        String inputJson = "{\"name\":\"myname\"}\n";
        ByteArrayInputStream is = new ByteArrayInputStream(inputJson.getBytes());
        AvroMessageReader avroReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)propertyMap);
        avroReader.init((InputStream)is, properties);
        try {
            message = avroReader.readMessage();
            Assert.fail((String)"Expected exception was not thrown. Exception should have been thrown due to schema not present in the mock schema registry with the TopicRecordNameStrategy, and auto-register disabled.");
        }
        catch (SerializationException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Error retrieving Avro schema"));
        }
        schemaRegistryClient.register("mytopic-" + this.recordSchema.getFullName(), (ParsedSchema)new AvroSchema(this.recordSchema));
        ((InputStream)is).reset();
        message = avroReader.readMessage();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, serializedValue.length, null, (Object)serializedValue);
        avroMessageFormatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input value json should match output value json", (Object)inputJson, (Object)outputJson);
    }

    @Test
    public void testUsingSubjectNameStrategy() throws Exception {
        HashMap<String, String> propertyMap = new HashMap<String, String>();
        String topicName = "mytopic";
        propertyMap.put("topic", "mytopic");
        propertyMap.put("schema.registry.url", "mock://foo");
        propertyMap.put("auto.register.schemas", "false");
        propertyMap.put("value.schema", RECORD_VALUE_SCHEMA_STRING);
        propertyMap.put("key.schema", RECORD_KEY_SCHEMA_STRING);
        propertyMap.put("parse.key", "true");
        propertyMap.put("print.key", "true");
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(propertyMap);
        SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry.getClientForScope((String)"foo");
        Schema.Parser parser = new Schema.Parser();
        Schema keySchema = parser.parse(RECORD_KEY_SCHEMA_STRING);
        Schema valueSchema = parser.parse(RECORD_VALUE_SCHEMA_STRING);
        schemaRegistryClient.register("mytopic-key", (ParsedSchema)new AvroSchema(keySchema));
        schemaRegistryClient.register("mytopic-value", (ParsedSchema)new AvroSchema(valueSchema));
        String inputJson = "{\"key_field\":\"1\"}\t{\"value_field\":\"1\"}\n";
        ByteArrayInputStream is = new ByteArrayInputStream(inputJson.getBytes());
        AvroMessageReader avroReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)propertyMap);
        avroReader.init((InputStream)is, properties);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedKey = (byte[])message.key();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("mytopic", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, serializedKey.length, serializedValue.length, (Object)serializedKey, (Object)serializedValue);
        avroMessageFormatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input value json should match output value json", (Object)inputJson, (Object)outputJson);
    }

    @Test
    public void testUsingHeaders() throws Exception {
        HashMap<String, String> propertyMap = new HashMap<String, String>();
        String topicName = "mytopic";
        propertyMap.put("topic", "mytopic");
        propertyMap.put("schema.registry.url", "mock://foo");
        propertyMap.put("auto.register.schemas", "false");
        propertyMap.put("value.schema", RECORD_VALUE_SCHEMA_STRING);
        propertyMap.put("key.schema", RECORD_KEY_SCHEMA_STRING);
        propertyMap.put("parse.key", "true");
        propertyMap.put("parse.headers", "true");
        propertyMap.put("print.key", "true");
        propertyMap.put("print.headers", "true");
        propertyMap.put("headers.deserializer", StringDeserializer.class.getName());
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(propertyMap);
        SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry.getClientForScope((String)"foo");
        Schema.Parser parser = new Schema.Parser();
        Schema keySchema = parser.parse(RECORD_KEY_SCHEMA_STRING);
        Schema valueSchema = parser.parse(RECORD_VALUE_SCHEMA_STRING);
        schemaRegistryClient.register("mytopic-key", (ParsedSchema)new AvroSchema(keySchema));
        schemaRegistryClient.register("mytopic-value", (ParsedSchema)new AvroSchema(valueSchema));
        String input = "headerKey0:headerValue0,headerKey1:headerValue\t{\"key_field\":\"1\"}\t{\"value_field\":\"1\"}\n";
        ByteArrayInputStream is = new ByteArrayInputStream(input.getBytes());
        AvroMessageReader avroReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)propertyMap);
        avroReader.init((InputStream)is, properties);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedKey = (byte[])message.key();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("mytopic", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, serializedKey.length, serializedValue.length, (Object)serializedKey, (Object)serializedValue, message.headers(), Optional.empty());
        avroMessageFormatter.writeTo(crecord, ps);
        String output = baos.toString();
        Assert.assertEquals((String)"Input value should match output value", (Object)input, (Object)output);
    }

    @Test
    public void testUsingNull() throws Exception {
        HashMap<String, String> propertyMap = new HashMap<String, String>();
        String topicName = "mytopic";
        propertyMap.put("topic", "mytopic");
        propertyMap.put("schema.registry.url", "mock://foo");
        propertyMap.put("auto.register.schemas", "false");
        propertyMap.put("value.schema", RECORD_VALUE_SCHEMA_STRING);
        propertyMap.put("key.schema", RECORD_KEY_SCHEMA_STRING);
        propertyMap.put("parse.key", "true");
        propertyMap.put("parse.headers", "true");
        propertyMap.put("null.marker", "<NULL>");
        propertyMap.put("print.key", "true");
        propertyMap.put("print.headers", "true");
        propertyMap.put("null.literal", "<NULL>");
        propertyMap.put("headers.deserializer", StringDeserializer.class.getName());
        AvroMessageFormatter avroMessageFormatter = new AvroMessageFormatter();
        avroMessageFormatter.configure(propertyMap);
        SchemaRegistryClient schemaRegistryClient = MockSchemaRegistry.getClientForScope((String)"foo");
        Schema.Parser parser = new Schema.Parser();
        Schema keySchema = parser.parse(RECORD_KEY_SCHEMA_STRING);
        Schema valueSchema = parser.parse(RECORD_VALUE_SCHEMA_STRING);
        schemaRegistryClient.register("mytopic-key", (ParsedSchema)new AvroSchema(keySchema));
        schemaRegistryClient.register("mytopic-value", (ParsedSchema)new AvroSchema(valueSchema));
        String input = "headerKey0:<NULL>,headerKey1:<NULL>\t<NULL>\t<NULL>\n";
        ByteArrayInputStream is = new ByteArrayInputStream(input.getBytes());
        AvroMessageReader avroReader = new AvroMessageReader();
        Properties properties = new Properties();
        properties.putAll((Map<?, ?>)propertyMap);
        avroReader.init((InputStream)is, properties);
        ProducerRecord message = avroReader.readMessage();
        byte[] serializedKey = (byte[])message.key();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("mytopic", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0, 0, (Object)serializedKey, (Object)serializedValue, message.headers(), Optional.empty());
        avroMessageFormatter.writeTo(crecord, ps);
        String output = baos.toString();
        Assert.assertEquals((String)"Input value should match output value", (Object)input, (Object)output);
    }
}

