package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlTopic;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.avro.KsqlAvroTopicSerDe;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.clients.mapr.util.MapRTopicUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsBuilder;

/* loaded from: input_file:io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode.class */
public class KsqlStructuredDataOutputNode extends OutputNode {
    private final String kafkaTopicName;
    private final KsqlTopic ksqlTopic;
    private final Field keyField;
    private final Field timestampField;
    private final Map<String, Object> outputProperties;

    /* loaded from: input_file:io/confluent/ksql/planner/plan/KsqlStructuredDataOutputNode$Builder.class */
    public static class Builder {
        private PlanNodeId id;
        private PlanNode source;
        private Schema schema;
        private Field timestampField;
        private Field keyField;
        private KsqlTopic ksqlTopic;
        private String topicName;
        private Map<String, Object> outputProperties;
        private Optional<Integer> limit;

        public KsqlStructuredDataOutputNode build() {
            return new KsqlStructuredDataOutputNode(this.id, this.source, this.schema, this.timestampField, this.keyField, this.ksqlTopic, this.topicName, this.outputProperties, this.limit);
        }

        public static Builder from(KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode) {
            return new Builder().withId(ksqlStructuredDataOutputNode.getId()).withSource(ksqlStructuredDataOutputNode.getSource()).withSchema(ksqlStructuredDataOutputNode.getSchema()).withTimestampField(ksqlStructuredDataOutputNode.getTimestampField()).withKeyField(ksqlStructuredDataOutputNode.getKeyField()).withKsqlTopic(ksqlStructuredDataOutputNode.getKsqlTopic()).withTopicName(ksqlStructuredDataOutputNode.getKafkaTopicName()).withOutputProperties(ksqlStructuredDataOutputNode.getOutputProperties()).withLimit(ksqlStructuredDataOutputNode.getLimit());
        }

        Builder withLimit(Optional<Integer> optional) {
            this.limit = optional;
            return this;
        }

        Builder withOutputProperties(Map<String, Object> map) {
            this.outputProperties = map;
            return this;
        }

        Builder withTopicName(String str) {
            this.topicName = str;
            return this;
        }

        Builder withKsqlTopic(KsqlTopic ksqlTopic) {
            this.ksqlTopic = ksqlTopic;
            return this;
        }

        Builder withKeyField(Field field) {
            this.keyField = field;
            return this;
        }

        Builder withTimestampField(Field field) {
            this.timestampField = field;
            return this;
        }

        Builder withSchema(Schema schema) {
            this.schema = schema;
            return this;
        }

        Builder withSource(PlanNode planNode) {
            this.source = planNode;
            return this;
        }

        Builder withId(PlanNodeId planNodeId) {
            this.id = planNodeId;
            return this;
        }
    }

    @JsonCreator
    public KsqlStructuredDataOutputNode(@JsonProperty("id") PlanNodeId planNodeId, @JsonProperty("source") PlanNode planNode, @JsonProperty("schema") Schema schema, @JsonProperty("timestamp") Field field, @JsonProperty("key") Field field2, @JsonProperty("ksqlTopic") KsqlTopic ksqlTopic, @JsonProperty("topicName") String str, @JsonProperty("outputProperties") Map<String, Object> map, @JsonProperty("limit") Optional<Integer> optional) {
        super(planNodeId, planNode, schema, optional);
        this.kafkaTopicName = str;
        this.keyField = field2;
        this.timestampField = field;
        this.ksqlTopic = ksqlTopic;
        this.outputProperties = map;
    }

    public String getKafkaTopicName() {
        return this.kafkaTopicName;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public Field getKeyField() {
        return this.keyField;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public SchemaKStream buildStream(StreamsBuilder streamsBuilder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> map, SchemaRegistryClient schemaRegistryClient) {
        SchemaKStream buildStream = getSource().buildStream(streamsBuilder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, map, schemaRegistryClient);
        Set<Integer> rowTimeRowKeyIndexes = SchemaUtil.getRowTimeRowKeyIndexes(getSchema());
        Builder from = Builder.from(this);
        from.withSchema(SchemaUtil.removeImplicitRowTimeRowKeyFromSchema(getSchema()));
        if (getTopicSerde() instanceof KsqlAvroTopicSerDe) {
            addAvroSchemaToResultTopic(from);
        }
        Map<String, Object> outputProperties = getOutputProperties();
        if (outputProperties.containsKey("ksql.sink.partitions")) {
            ksqlConfig.put("ksql.sink.partitions", outputProperties.get("ksql.sink.partitions"));
        }
        if (outputProperties.containsKey("ksql.sink.replicas")) {
            ksqlConfig.put("ksql.sink.replicas", outputProperties.get("ksql.sink.replicas"));
        }
        SchemaKStream createOutputStream = createOutputStream(buildStream, from, functionRegistry, outputProperties, schemaRegistryClient);
        KsqlStructuredDataOutputNode build = from.build();
        createSinkTopic(build.getKafkaTopicName(), ksqlConfig, kafkaTopicClient, shoulBeCompacted(createOutputStream));
        createOutputStream.into(build.getKafkaTopicName(), build.getKsqlTopic().getKsqlTopicSerDe().getGenericRowSerde(build.getSchema(), ksqlConfig, false, schemaRegistryClient), rowTimeRowKeyIndexes);
        createOutputStream.setOutputNode(from.withSchema(SchemaUtil.addImplicitRowTimeRowKeyToSchema(build.getSchema())).build());
        return createOutputStream;
    }

    private boolean shoulBeCompacted(SchemaKStream schemaKStream) {
        return (schemaKStream instanceof SchemaKTable) && !((SchemaKTable) schemaKStream).isWindowed();
    }

    private SchemaKStream createOutputStream(SchemaKStream schemaKStream, Builder builder, FunctionRegistry functionRegistry, Map<String, Object> map, SchemaRegistryClient schemaRegistryClient) {
        if (schemaKStream instanceof SchemaKTable) {
            return schemaKStream;
        }
        SchemaKStream schemaKStream2 = new SchemaKStream(getSchema(), schemaKStream.getKstream(), getKeyField(), Collections.singletonList(schemaKStream), SchemaKStream.Type.SINK, functionRegistry, schemaRegistryClient);
        if (!map.containsKey("PARTITION_BY")) {
            return schemaKStream2;
        }
        String obj = map.get("PARTITION_BY").toString();
        Field field = (Field) SchemaUtil.getFieldByName(schemaKStream2.getSchema(), obj).orElseThrow(() -> {
            return new KsqlException(String.format("Column %s does not exist in the result schema. Error in Partition By clause.", obj));
        });
        builder.withKeyField(field);
        return schemaKStream2.selectKey(field, false);
    }

    private void addAvroSchemaToResultTopic(Builder builder) {
        builder.withKsqlTopic(new KsqlTopic(getKsqlTopic().getName(), getKsqlTopic().getKafkaTopicName(), new KsqlAvroTopicSerDe()));
    }

    private void createSinkTopic(String str, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, boolean z) {
        int intValue = ((Integer) ksqlConfig.get("ksql.sink.partitions")).intValue();
        short shortValue = ((Short) ksqlConfig.get("ksql.sink.replicas")).shortValue();
        String str2 = (String) ksqlConfig.get("ksql.default.stream");
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        kafkaTopicClient.createTopic((String) MapRTopicUtils.decorateTopicsWithDefaultStreamIfNeeded(arrayList, str2).get(0), intValue, shortValue, z);
    }

    public Field getTimestampField() {
        return this.timestampField;
    }

    public KsqlTopic getKsqlTopic() {
        return this.ksqlTopic;
    }

    public Map<String, Object> getOutputProperties() {
        return this.outputProperties;
    }

    public KsqlTopicSerDe getTopicSerde() {
        return this.ksqlTopic.getKsqlTopicSerDe();
    }
}
