/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.formatter.protobuf;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.formatter.protobuf.ProtobufMessageFormatter;
import io.confluent.kafka.formatter.protobuf.ProtobufMessageReader;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.TimestampType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KafkaProtobufFormatterTest {
    private Properties props;
    private ProtobufMessageFormatter formatter;
    private ProtobufSchema recordSchema = null;
    private ProtobufSchema enumSchema = null;
    private ProtobufSchema keySchema = null;
    private SchemaRegistryClient schemaRegistry = null;
    private static ObjectMapper objectMapper = new ObjectMapper();

    @Before
    public void setUp() {
        this.props = new Properties();
        this.props.put("schema.registry.url", "bogus");
        String userSchema = "syntax = \"proto3\"; message User { string name = 1; }";
        this.recordSchema = new ProtobufSchema(userSchema);
        String enumSchema = "syntax = \"proto3\"; message ConfluentDefault1 {enum Suit {SPADES = 0; HEARTS = 1; DIAMONDS = 2; CLUBS = 4;} Suit c1 = 1;}";
        this.enumSchema = new ProtobufSchema(enumSchema);
        this.recordSchema = new ProtobufSchema(userSchema);
        String keySchema = "syntax = \"proto3\"; message Key { int32 key = 1; }";
        this.keySchema = new ProtobufSchema(keySchema);
        this.schemaRegistry = new MockSchemaRegistryClient();
        this.formatter = new ProtobufMessageFormatter(this.schemaRegistry, null);
    }

    @Test
    public void testKafkaProtobufValueFormatter() throws Exception {
        this.formatter.init(this.props);
        String inputJson = "{\"name\":\"myname\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        ProtobufMessageReader protobufReader = new ProtobufMessageReader(this.schemaRegistry, null, this.recordSchema, "topic1", false, reader, true, false);
        ProducerRecord message = protobufReader.readMessage();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, serializedValue.length, null, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input value json should match output value json", (Object)objectMapper.readTree(inputJson), (Object)objectMapper.readTree(outputJson));
    }

    @Test
    public void testKafkaProtobufEnumValueFormatter() throws Exception {
        this.formatter.init(this.props);
        String inputJson = "{\"c1\":\"SPADES\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        ProtobufMessageReader protobufReader = new ProtobufMessageReader(this.schemaRegistry, null, this.enumSchema, "topic1", false, reader, true, false);
        ProducerRecord message = protobufReader.readMessage();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, 0, serializedValue.length, null, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input value json should match output value json", (Object)objectMapper.readTree(inputJson), (Object)objectMapper.readTree(outputJson));
    }

    @Test
    public void testKafkaProtobufKeyValueFormatter() throws Exception {
        this.props.put("print.key", "true");
        this.formatter.init(this.props);
        String inputJson = "{\"key\":10}\t{\"name\":\"myname\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        ProtobufMessageReader protobufReader = new ProtobufMessageReader(this.schemaRegistry, this.keySchema, this.recordSchema, "topic1", true, reader, true, false);
        ProducerRecord message = protobufReader.readMessage();
        byte[] serializedKey = (byte[])message.key();
        byte[] serializedValue = (byte[])message.value();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream(baos);
        ConsumerRecord crecord = new ConsumerRecord("topic1", 0, 200L, 1000L, TimestampType.LOG_APPEND_TIME, 0L, serializedKey.length, serializedValue.length, (Object)serializedKey, (Object)serializedValue);
        this.formatter.writeTo(crecord, ps);
        String outputJson = baos.toString();
        Assert.assertEquals((String)"Input key/value json should match output key/value json", (Object)objectMapper.readTree(inputJson), (Object)objectMapper.readTree(outputJson));
    }

    @Test
    public void testInvalidFormat() {
        String inputJson = "{\"invalid-field-name\":\"myname\"}\n";
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(inputJson.getBytes())));
        ProtobufMessageReader protobufReader = new ProtobufMessageReader(this.schemaRegistry, null, this.recordSchema, "topic1", false, reader, true, false);
        try {
            protobufReader.readMessage();
            Assert.fail((String)"Registering an invalid schema should fail");
        }
        catch (SerializationException e) {
            Assert.assertTrue((String)"The cause of the exception should be protobuf", (boolean)(e.getCause() instanceof InvalidProtocolBufferException));
        }
    }
}

