package io.confluent.ksql.physical;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlStream;
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.ProducerCollector;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.structured.QueuedSchemaKStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.timestamp.KsqlTimestampExtractor;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;

/* loaded from: input_file:io/confluent/ksql/physical/PhysicalPlanBuilder.class */
public class PhysicalPlanBuilder {
    private final StreamsBuilder builder;
    private final KsqlConfig ksqlConfig;
    private final KafkaTopicClient kafkaTopicClient;
    private final MetastoreUtil metastoreUtil;
    private final FunctionRegistry functionRegistry;
    private final Map<String, Object> overriddenStreamsProperties;
    private final MetaStore metaStore;
    private final boolean updateMetastore;
    private final SchemaRegistryClient schemaRegistryClient;
    private final KafkaStreamsBuilder kafkaStreamsBuilder;

    public PhysicalPlanBuilder(StreamsBuilder streamsBuilder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> map, boolean z, MetaStore metaStore, SchemaRegistryClient schemaRegistryClient, KafkaStreamsBuilder kafkaStreamsBuilder) {
        this.builder = streamsBuilder;
        this.ksqlConfig = ksqlConfig;
        this.kafkaTopicClient = kafkaTopicClient;
        this.metastoreUtil = metastoreUtil;
        this.functionRegistry = functionRegistry;
        this.overriddenStreamsProperties = map;
        this.metaStore = metaStore;
        this.updateMetastore = z;
        this.schemaRegistryClient = schemaRegistryClient;
        this.kafkaStreamsBuilder = kafkaStreamsBuilder;
    }

    public PhysicalPlanBuilder(StreamsBuilder streamsBuilder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> map, boolean z, MetaStore metaStore, SchemaRegistryClient schemaRegistryClient) {
        this(streamsBuilder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, map, z, metaStore, schemaRegistryClient, new KafkaStreamsBuilderImpl());
    }

    public QueryMetadata buildPhysicalPlan(Pair<String, PlanNode> pair) throws Exception {
        SchemaKStream buildStream = ((PlanNode) pair.getRight()).buildStream(this.builder, this.ksqlConfig, this.kafkaTopicClient, this.metastoreUtil, this.functionRegistry, this.overriddenStreamsProperties, this.schemaRegistryClient);
        OutputNode outputNode = buildStream.outputNode();
        boolean z = outputNode instanceof KsqlBareOutputNode;
        if (z && !(buildStream instanceof QueuedSchemaKStream)) {
            throw new Exception(String.format("Mismatch between logical and physical output; expected a QueuedSchemaKStream based on logical KsqlBareOutputNode, found a %s instead", buildStream.getClass().getCanonicalName()));
        }
        String serviceId = getServiceId();
        String obj = this.ksqlConfig.get("ksql.persistent.prefix").toString();
        String obj2 = this.ksqlConfig.get("ksql.transient.prefix").toString();
        if (z) {
            return buildPlanForBareQuery((QueuedSchemaKStream) buildStream, (KsqlBareOutputNode) outputNode, serviceId, obj2, (String) pair.getLeft());
        }
        if (outputNode instanceof KsqlStructuredDataOutputNode) {
            return buildPlanForStructuredOutputNode((String) pair.getLeft(), buildStream, (KsqlStructuredDataOutputNode) outputNode, serviceId, obj, (String) pair.getLeft());
        }
        throw new KsqlException("Sink data source of type: " + outputNode.getClass() + " is not supported.");
    }

    private QueryMetadata buildPlanForBareQuery(QueuedSchemaKStream queuedSchemaKStream, KsqlBareOutputNode ksqlBareOutputNode, String str, String str2, String str3) {
        String addTimeSuffix = addTimeSuffix(getBareQueryApplicationId(str, str2));
        return new QueuedQueryMetadata(str3, buildStreams(this.builder, addTimeSuffix, this.ksqlConfig, this.overriddenStreamsProperties), ksqlBareOutputNode, queuedSchemaKStream.getExecutionPlan(""), queuedSchemaKStream.getQueue(), queuedSchemaKStream.getSourceSchemaKStreams().get(0) instanceof SchemaKTable ? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM, addTimeSuffix, this.kafkaTopicClient, this.builder.build());
    }

    private QueryMetadata buildPlanForStructuredOutputNode(String str, SchemaKStream schemaKStream, KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode, String str2, String str3, String str4) {
        if (this.metaStore.getTopic(ksqlStructuredDataOutputNode.getKafkaTopicName()) == null) {
            this.metaStore.putTopic(ksqlStructuredDataOutputNode.getKsqlTopic());
        }
        KsqlTable ksqlTable = schemaKStream instanceof SchemaKTable ? new KsqlTable(str, ksqlStructuredDataOutputNode.getId().toString(), ksqlStructuredDataOutputNode.getSchema(), schemaKStream.getKeyField(), ksqlStructuredDataOutputNode.getTimestampField(), ksqlStructuredDataOutputNode.getKsqlTopic(), ksqlStructuredDataOutputNode.getId().toString() + this.ksqlConfig.get("ksql.statestore.suffix"), ((SchemaKTable) schemaKStream).isWindowed()) : new KsqlStream(str, ksqlStructuredDataOutputNode.getId().toString(), ksqlStructuredDataOutputNode.getSchema(), schemaKStream.getKeyField(), ksqlStructuredDataOutputNode.getTimestampField(), ksqlStructuredDataOutputNode.getKsqlTopic());
        if (this.updateMetastore) {
            this.metaStore.putSource(ksqlTable.cloneWithTimeKeyColumns());
        }
        QueryId persistentQueryId = ksqlTable.getPersistentQueryId();
        String str5 = str2 + str3 + persistentQueryId;
        return new PersistentQueryMetadata(str4, buildStreams(this.builder, str5, this.ksqlConfig, this.overriddenStreamsProperties), ksqlStructuredDataOutputNode, schemaKStream.getExecutionPlan(""), persistentQueryId, schemaKStream instanceof SchemaKTable ? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM, str5, this.kafkaTopicClient, ksqlStructuredDataOutputNode.getSchema(), ksqlTable.getKsqlTopic(), this.builder.build());
    }

    private String getBareQueryApplicationId(String str, String str2) {
        return str + str2 + Math.abs(ThreadLocalRandom.current().nextLong());
    }

    private String addTimeSuffix(String str) {
        return String.format("%s_%d", str, Long.valueOf(System.currentTimeMillis()));
    }

    private void updateListProperty(Map<String, Object> map, String str, Object obj) {
        List list;
        Object orDefault = map.getOrDefault(str, new LinkedList());
        if (orDefault instanceof String) {
            list = new LinkedList(Arrays.asList(((String) orDefault).split("\\s*,\\s*")));
        } else {
            if (!(orDefault instanceof List)) {
                throw new KsqlException("Expecting list or string for property: " + str);
            }
            list = (List) orDefault;
        }
        list.add(obj);
        map.put(str, list);
    }

    private KafkaStreams buildStreams(StreamsBuilder streamsBuilder, String str, KsqlConfig ksqlConfig, Map<String, Object> map) {
        Map<String, Object> ksqlStreamConfigProps = ksqlConfig.getKsqlStreamConfigProps();
        ksqlStreamConfigProps.putAll(map);
        ksqlStreamConfigProps.put("application.id", str);
        ksqlStreamConfigProps.put("auto.offset.reset", ksqlConfig.get("auto.offset.reset"));
        ksqlStreamConfigProps.put("commit.interval.ms", ksqlConfig.get("commit.interval.ms"));
        ksqlStreamConfigProps.put("cache.max.bytes.buffering", ksqlConfig.get("cache.max.bytes.buffering"));
        if (ksqlConfig.get("ksql.timestamp.column.index") != null) {
            ksqlStreamConfigProps.put("ksql.timestamp.column.index", ksqlConfig.get("ksql.timestamp.column.index"));
            ksqlStreamConfigProps.put("default.timestamp.extractor", KsqlTimestampExtractor.class);
        }
        updateListProperty(ksqlStreamConfigProps, StreamsConfig.consumerPrefix("interceptor.classes"), ConsumerCollector.class.getCanonicalName());
        updateListProperty(ksqlStreamConfigProps, StreamsConfig.producerPrefix("interceptor.classes"), ProducerCollector.class.getCanonicalName());
        return this.kafkaStreamsBuilder.buildKafkaStreams(streamsBuilder, new StreamsConfig(ksqlStreamConfigProps));
    }

    String getServiceId() {
        return "_confluent-ksql-" + this.ksqlConfig.get("ksql.service.id").toString();
    }
}
