/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.util;

import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.parser.SqlFormatter;
import io.confluent.ksql.parser.tree.AbstractStreamCreateStatement;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.Node;
import io.confluent.ksql.parser.tree.StringLiteral;
import io.confluent.ksql.parser.tree.TableElement;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.SchemaUtil;
import io.confluent.ksql.util.SerDeUtil;
import io.confluent.ksql.util.StringUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.kafka.connect.data.Field;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroUtil {
    private static final Logger log = LoggerFactory.getLogger(AvroUtil.class);

    public Pair<AbstractStreamCreateStatement, String> checkAndSetAvroSchema(AbstractStreamCreateStatement abstractStreamCreateStatement, Map<String, Object> streamsProperties, SchemaRegistryClient schemaRegistryClient) {
        Map ddlProperties = abstractStreamCreateStatement.getProperties();
        if (!ddlProperties.containsKey("VALUE_FORMAT")) {
            throw new KsqlException(String.format("%s should be set in WITH clause of CREATE STREAM/TABLE statement.", "VALUE_FORMAT"));
        }
        String serde = StringUtil.cleanQuotes(((Expression)ddlProperties.get("VALUE_FORMAT")).toString());
        if (!serde.equalsIgnoreCase("AVRO")) {
            return new Pair((Object)abstractStreamCreateStatement, null);
        }
        String kafkaTopicName = StringUtil.cleanQuotes(((Expression)ddlProperties.get("KAFKA_TOPIC")).toString());
        try {
            if (abstractStreamCreateStatement.getElements().isEmpty()) {
                SchemaMetadata schemaMetadata = this.fetchSchemaMetadata(abstractStreamCreateStatement, schemaRegistryClient, kafkaTopicName);
                String avroSchemaString = schemaMetadata.getSchema();
                streamsProperties.put("AVROSCHEMA", avroSchemaString);
                org.apache.kafka.connect.data.Schema schema = SerDeUtil.getSchemaFromAvro(avroSchemaString);
                AbstractStreamCreateStatement abstractStreamCreateStatementCopy = this.addAvroFields(abstractStreamCreateStatement, schema, schemaMetadata.getId());
                return new Pair((Object)abstractStreamCreateStatementCopy, (Object)SqlFormatter.formatSql((Node)abstractStreamCreateStatementCopy));
            }
            return new Pair((Object)abstractStreamCreateStatement, null);
        }
        catch (Exception e) {
            String errorMessage = String.format(" Could not fetch the AVRO schema from schema registry. %s ", e.getMessage());
            throw new KsqlException(errorMessage);
        }
    }

    private SchemaMetadata fetchSchemaMetadata(AbstractStreamCreateStatement abstractStreamCreateStatement, SchemaRegistryClient schemaRegistryClient, String kafkaTopicName) throws IOException, RestClientException {
        if (abstractStreamCreateStatement.getProperties().containsKey("AVRO_SCHEMA_ID")) {
            int schemaId;
            try {
                schemaId = Integer.parseInt(StringUtil.cleanQuotes(((Expression)abstractStreamCreateStatement.getProperties().get("AVRO_SCHEMA_ID")).toString()));
            }
            catch (NumberFormatException e) {
                throw new KsqlException(String.format("Invalid schema id property: %s.", ((Expression)abstractStreamCreateStatement.getProperties().get("AVRO_SCHEMA_ID")).toString()));
            }
            return schemaRegistryClient.getSchemaMetadata(kafkaTopicName + "-value", schemaId);
        }
        return schemaRegistryClient.getLatestSchemaMetadata(kafkaTopicName + "-value");
    }

    private AbstractStreamCreateStatement addAvroFields(AbstractStreamCreateStatement abstractStreamCreateStatement, org.apache.kafka.connect.data.Schema schema, int schemaId) {
        ArrayList<TableElement> elements = new ArrayList<TableElement>();
        for (Field field : schema.fields()) {
            TableElement tableElement = new TableElement(field.name().toUpperCase(), SchemaUtil.getSQLTypeName((org.apache.kafka.connect.data.Schema)field.schema()));
            elements.add(tableElement);
        }
        StringLiteral schemaIdLiteral = new StringLiteral(String.format("%d", schemaId));
        HashMap<String, StringLiteral> properties = new HashMap<String, StringLiteral>(abstractStreamCreateStatement.getProperties());
        if (!abstractStreamCreateStatement.getProperties().containsKey("AVRO_SCHEMA_ID")) {
            properties.put("AVRO_SCHEMA_ID", schemaIdLiteral);
        }
        return abstractStreamCreateStatement.copyWith(elements, properties);
    }

    public void validatePersistentQueryResults(PersistentQueryMetadata persistentQueryMetadata, SchemaRegistryClient schemaRegistryClient) {
        if (persistentQueryMetadata.getResultTopicSerde() == DataSource.DataSourceSerDe.AVRO) {
            String avroSchemaString = SchemaUtil.buildAvroSchema((org.apache.kafka.connect.data.Schema)persistentQueryMetadata.getResultSchema(), (String)persistentQueryMetadata.getResultTopic().getName());
            boolean isValidSchema = this.isValidAvroSchemaForTopic(persistentQueryMetadata.getResultTopic().getTopicName(), avroSchemaString, schemaRegistryClient);
            if (!isValidSchema) {
                throw new KsqlException(String.format("Cannot register avro schema for %s since it is not valid for schema registry.", persistentQueryMetadata.getResultTopic().getKafkaTopicName()));
            }
        }
    }

    private boolean isValidAvroSchemaForTopic(String topicName, String avroSchemaString, SchemaRegistryClient schemaRegistryClient) {
        Schema.Parser parser = new Schema.Parser();
        Schema avroSchema = parser.parse(avroSchemaString);
        try {
            return schemaRegistryClient.testCompatibility(topicName, avroSchema);
        }
        catch (IOException e) {
            String errorMessage = String.format("Could not check Schema compatibility: %s", e.getMessage());
            log.error(errorMessage, (Throwable)e);
            throw new KsqlException(errorMessage);
        }
        catch (RestClientException e) {
            String errorMessage = String.format("Could not connect to Schema Registry service: %s", e.getMessage());
            log.error(errorMessage, (Throwable)e);
            throw new KsqlException(errorMessage);
        }
    }
}

