/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.mapr.util.MapRTopicUtils;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;

public class KsqlStructuredDataOutputNode
extends OutputNode {
    private final String kafkaTopicName;
    private final KsqlTopic ksqlTopic;
    private final Field keyField;
    private final Field timestampField;
    private final Map<String, Object> outputProperties;

    @JsonCreator
    public KsqlStructuredDataOutputNode(@JsonProperty(value="id") PlanNodeId id, @JsonProperty(value="source") PlanNode source, @JsonProperty(value="schema") Schema schema, @JsonProperty(value="timestamp") Field timestampField, @JsonProperty(value="key") Field keyField, @JsonProperty(value="ksqlTopic") KsqlTopic ksqlTopic, @JsonProperty(value="topicName") String topicName, @JsonProperty(value="outputProperties") Map<String, Object> outputProperties, @JsonProperty(value="limit") Optional<Integer> limit) {
        super(id, source, schema, limit);
        this.kafkaTopicName = topicName;
        this.keyField = keyField;
        this.timestampField = timestampField;
        this.ksqlTopic = ksqlTopic;
        this.outputProperties = outputProperties;
    }

    public String getKafkaTopicName() {
        return this.kafkaTopicName;
    }

    @Override
    public Field getKeyField() {
        return this.keyField;
    }

    @Override
    public SchemaKStream buildStream(StreamsBuilder builder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> props, SchemaRegistryClient schemaRegistryClient) {
        Map<String, Object> outputProperties;
        SchemaKStream schemaKStream = this.getSource().buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, props, schemaRegistryClient);
        Set rowkeyIndexes = SchemaUtil.getRowTimeRowKeyIndexes((Schema)this.getSchema());
        Builder outputNodeBuilder = Builder.from(this);
        Schema schema = SchemaUtil.removeImplicitRowTimeRowKeyFromSchema((Schema)this.getSchema());
        outputNodeBuilder.withSchema(schema);
        if (this.getTopicSerde() instanceof KsqlAvroTopicSerDe) {
            this.addAvroSchemaToResultTopic(outputNodeBuilder);
        }
        if ((outputProperties = this.getOutputProperties()).containsKey("ksql.sink.partitions")) {
            ksqlConfig.put("ksql.sink.partitions", outputProperties.get("ksql.sink.partitions"));
        }
        if (outputProperties.containsKey("ksql.sink.replicas")) {
            ksqlConfig.put("ksql.sink.replicas", outputProperties.get("ksql.sink.replicas"));
        }
        SchemaKStream result = this.createOutputStream(schemaKStream, outputNodeBuilder, functionRegistry, outputProperties, schemaRegistryClient);
        KsqlStructuredDataOutputNode noRowKey = outputNodeBuilder.build();
        this.createSinkTopic(noRowKey.getKafkaTopicName(), ksqlConfig, kafkaTopicClient, this.shoulBeCompacted(result));
        result.into(noRowKey.getKafkaTopicName(), (Serde<GenericRow>)noRowKey.getKsqlTopic().getKsqlTopicSerDe().getGenericRowSerde(noRowKey.getSchema(), ksqlConfig, false, schemaRegistryClient), rowkeyIndexes);
        result.setOutputNode(outputNodeBuilder.withSchema(SchemaUtil.addImplicitRowTimeRowKeyToSchema((Schema)noRowKey.getSchema())).build());
        return result;
    }

    private boolean shoulBeCompacted(SchemaKStream result) {
        return result instanceof SchemaKTable && !((SchemaKTable)result).isWindowed();
    }

    private SchemaKStream createOutputStream(SchemaKStream schemaKStream, Builder outputNodeBuilder, FunctionRegistry functionRegistry, Map<String, Object> outputProperties, SchemaRegistryClient schemaRegistryClient) {
        if (schemaKStream instanceof SchemaKTable) {
            return schemaKStream;
        }
        SchemaKStream result = new SchemaKStream(this.getSchema(), (KStream<String, GenericRow>)schemaKStream.getKstream(), this.getKeyField(), Collections.singletonList(schemaKStream), SchemaKStream.Type.SINK, functionRegistry, schemaRegistryClient);
        if (outputProperties.containsKey("PARTITION_BY")) {
            String keyFieldName = outputProperties.get("PARTITION_BY").toString();
            Field keyField = (Field)SchemaUtil.getFieldByName((Schema)result.getSchema(), (String)keyFieldName).orElseThrow(() -> new KsqlException(String.format("Column %s does not exist in the result schema. Error in Partition By clause.", keyFieldName)));
            outputNodeBuilder.withKeyField(keyField);
            return result.selectKey(keyField, false);
        }
        return result;
    }

    private void addAvroSchemaToResultTopic(Builder builder) {
        KsqlAvroTopicSerDe ksqlAvroTopicSerDe = new KsqlAvroTopicSerDe();
        builder.withKsqlTopic(new KsqlTopic(this.getKsqlTopic().getName(), this.getKsqlTopic().getKafkaTopicName(), (KsqlTopicSerDe)ksqlAvroTopicSerDe));
    }

    private void createSinkTopic(String kafkaTopicName, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, boolean isCompacted) {
        int numberOfPartitions = (Integer)ksqlConfig.get("ksql.sink.partitions");
        short numberOfReplications = (Short)ksqlConfig.get("ksql.sink.replicas");
        String defaultStream = (String)ksqlConfig.get("ksql.default.stream");
        ArrayList<String> topicList = new ArrayList<String>();
        topicList.add(kafkaTopicName);
        String decoratedKafkaTopicName = (String)MapRTopicUtils.decorateTopicsWithDefaultStreamIfNeeded(topicList, (String)defaultStream).get(0);
        kafkaTopicClient.createTopic(decoratedKafkaTopicName, numberOfPartitions, numberOfReplications, isCompacted);
    }

    public Field getTimestampField() {
        return this.timestampField;
    }

    public KsqlTopic getKsqlTopic() {
        return this.ksqlTopic;
    }

    public Map<String, Object> getOutputProperties() {
        return this.outputProperties;
    }

    public KsqlTopicSerDe getTopicSerde() {
        return this.ksqlTopic.getKsqlTopicSerDe();
    }

    public static class Builder {
        private PlanNodeId id;
        private PlanNode source;
        private Schema schema;
        private Field timestampField;
        private Field keyField;
        private KsqlTopic ksqlTopic;
        private String topicName;
        private Map<String, Object> outputProperties;
        private Optional<Integer> limit;

        public KsqlStructuredDataOutputNode build() {
            return new KsqlStructuredDataOutputNode(this.id, this.source, this.schema, this.timestampField, this.keyField, this.ksqlTopic, this.topicName, this.outputProperties, this.limit);
        }

        public static Builder from(KsqlStructuredDataOutputNode original) {
            return new Builder().withId(original.getId()).withSource(original.getSource()).withSchema(original.getSchema()).withTimestampField(original.getTimestampField()).withKeyField(original.getKeyField()).withKsqlTopic(original.getKsqlTopic()).withTopicName(original.getKafkaTopicName()).withOutputProperties(original.getOutputProperties()).withLimit(original.getLimit());
        }

        Builder withLimit(Optional<Integer> limit) {
            this.limit = limit;
            return this;
        }

        Builder withOutputProperties(Map<String, Object> outputProperties) {
            this.outputProperties = outputProperties;
            return this;
        }

        Builder withTopicName(String topicName) {
            this.topicName = topicName;
            return this;
        }

        Builder withKsqlTopic(KsqlTopic ksqlTopic) {
            this.ksqlTopic = ksqlTopic;
            return this;
        }

        Builder withKeyField(Field keyField) {
            this.keyField = keyField;
            return this;
        }

        Builder withTimestampField(Field timestampField) {
            this.timestampField = timestampField;
            return this;
        }

        Builder withSchema(Schema schema) {
            this.schema = schema;
            return this;
        }

        Builder withSource(PlanNode source) {
            this.source = source;
            return this;
        }

        Builder withId(PlanNodeId id) {
            this.id = id;
            return this;
        }
    }
}

