/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.physical;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlStream;
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.metrics.ConsumerCollector;
import io.confluent.ksql.metrics.ProducerCollector;
import io.confluent.ksql.physical.KafkaStreamsBuilder;
import io.confluent.ksql.physical.KafkaStreamsBuilderImpl;
import io.confluent.ksql.planner.plan.KsqlBareOutputNode;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.structured.QueuedSchemaKStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import io.confluent.ksql.util.QueuedQueryMetadata;
import io.confluent.ksql.util.timestamp.KsqlTimestampExtractor;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

public class PhysicalPlanBuilder {
    private final StreamsBuilder builder;
    private final KsqlConfig ksqlConfig;
    private final KafkaTopicClient kafkaTopicClient;
    private final MetastoreUtil metastoreUtil;
    private final FunctionRegistry functionRegistry;
    private final Map<String, Object> overriddenStreamsProperties;
    private final MetaStore metaStore;
    private final boolean updateMetastore;
    private final SchemaRegistryClient schemaRegistryClient;
    private final KafkaStreamsBuilder kafkaStreamsBuilder;

    public PhysicalPlanBuilder(StreamsBuilder builder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> overriddenStreamsProperties, boolean updateMetastore, MetaStore metaStore, SchemaRegistryClient schemaRegistryClient, KafkaStreamsBuilder kafkaStreamsBuilder) {
        this.builder = builder;
        this.ksqlConfig = ksqlConfig;
        this.kafkaTopicClient = kafkaTopicClient;
        this.metastoreUtil = metastoreUtil;
        this.functionRegistry = functionRegistry;
        this.overriddenStreamsProperties = overriddenStreamsProperties;
        this.metaStore = metaStore;
        this.updateMetastore = updateMetastore;
        this.schemaRegistryClient = schemaRegistryClient;
        this.kafkaStreamsBuilder = kafkaStreamsBuilder;
    }

    public PhysicalPlanBuilder(StreamsBuilder builder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> overriddenStreamsProperties, boolean updateMetastore, MetaStore metaStore, SchemaRegistryClient schemaRegistryClient) {
        this(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, overriddenStreamsProperties, updateMetastore, metaStore, schemaRegistryClient, new KafkaStreamsBuilderImpl());
    }

    public QueryMetadata buildPhysicalPlan(Pair<String, PlanNode> statementPlanPair) throws Exception {
        SchemaKStream resultStream = ((PlanNode)statementPlanPair.getRight()).buildStream(this.builder, this.ksqlConfig, this.kafkaTopicClient, this.metastoreUtil, this.functionRegistry, this.overriddenStreamsProperties, this.schemaRegistryClient);
        OutputNode outputNode = resultStream.outputNode();
        boolean isBareQuery = outputNode instanceof KsqlBareOutputNode;
        if (isBareQuery && !(resultStream instanceof QueuedSchemaKStream)) {
            throw new Exception(String.format("Mismatch between logical and physical output; expected a QueuedSchemaKStream based on logical KsqlBareOutputNode, found a %s instead", resultStream.getClass().getCanonicalName()));
        }
        String serviceId = this.getServiceId();
        String persistanceQueryPrefix = this.ksqlConfig.get("ksql.persistent.prefix").toString();
        String transientQueryPrefix = this.ksqlConfig.get("ksql.transient.prefix").toString();
        if (isBareQuery) {
            return this.buildPlanForBareQuery((QueuedSchemaKStream)resultStream, (KsqlBareOutputNode)outputNode, serviceId, transientQueryPrefix, (String)statementPlanPair.getLeft());
        }
        if (outputNode instanceof KsqlStructuredDataOutputNode) {
            return this.buildPlanForStructuredOutputNode((String)statementPlanPair.getLeft(), resultStream, (KsqlStructuredDataOutputNode)outputNode, serviceId, persistanceQueryPrefix, (String)statementPlanPair.getLeft());
        }
        throw new KsqlException("Sink data source of type: " + outputNode.getClass() + " is not supported.");
    }

    private QueryMetadata buildPlanForBareQuery(QueuedSchemaKStream schemaKStream, KsqlBareOutputNode bareOutputNode, String serviceId, String transientQueryPrefix, String statement) {
        String applicationId = this.addTimeSuffix(this.getBareQueryApplicationId(serviceId, transientQueryPrefix));
        KafkaStreams streams = this.buildStreams(this.builder, applicationId, this.ksqlConfig, this.overriddenStreamsProperties);
        SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);
        return new QueuedQueryMetadata(statement, streams, bareOutputNode, schemaKStream.getExecutionPlan(""), schemaKStream.getQueue(), sourceSchemaKstream instanceof SchemaKTable ? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM, applicationId, this.kafkaTopicClient, this.builder.build());
    }

    private QueryMetadata buildPlanForStructuredOutputNode(String sqlExpression, SchemaKStream schemaKStream, KsqlStructuredDataOutputNode outputNode, String serviceId, String persistanceQueryPrefix, String statement) {
        KsqlStream sinkDataSource;
        if (this.metaStore.getTopic(outputNode.getKafkaTopicName()) == null) {
            this.metaStore.putTopic(outputNode.getKsqlTopic());
        }
        if (schemaKStream instanceof SchemaKTable) {
            SchemaKTable schemaKTable = (SchemaKTable)schemaKStream;
            sinkDataSource = new KsqlTable(sqlExpression, outputNode.getId().toString(), outputNode.getSchema(), schemaKStream.getKeyField(), outputNode.getTimestampField(), outputNode.getKsqlTopic(), outputNode.getId().toString() + this.ksqlConfig.get("ksql.statestore.suffix"), schemaKTable.isWindowed());
        } else {
            sinkDataSource = new KsqlStream(sqlExpression, outputNode.getId().toString(), outputNode.getSchema(), schemaKStream.getKeyField(), outputNode.getTimestampField(), outputNode.getKsqlTopic());
        }
        if (this.updateMetastore) {
            this.metaStore.putSource(sinkDataSource.cloneWithTimeKeyColumns());
        }
        QueryId queryId = sinkDataSource.getPersistentQueryId();
        String applicationId = serviceId + persistanceQueryPrefix + queryId;
        KafkaStreams streams = this.buildStreams(this.builder, applicationId, this.ksqlConfig, this.overriddenStreamsProperties);
        Topology topology = this.builder.build();
        return new PersistentQueryMetadata(statement, streams, outputNode, schemaKStream.getExecutionPlan(""), queryId, schemaKStream instanceof SchemaKTable ? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM, applicationId, this.kafkaTopicClient, outputNode.getSchema(), sinkDataSource.getKsqlTopic(), topology);
    }

    private String getBareQueryApplicationId(String serviceId, String transientQueryPrefix) {
        return serviceId + transientQueryPrefix + Math.abs(ThreadLocalRandom.current().nextLong());
    }

    private String addTimeSuffix(String original) {
        return String.format("%s_%d", original, System.currentTimeMillis());
    }

    private void updateListProperty(Map<String, Object> properties, String key, Object value) {
        LinkedList<String> valueList;
        LinkedList obj = properties.getOrDefault(key, new LinkedList());
        if (obj instanceof String) {
            String asString = (String)((Object)obj);
            valueList = new LinkedList<String>(Arrays.asList(asString.split("\\s*,\\s*")));
        } else if (obj instanceof List) {
            valueList = obj;
        } else {
            throw new KsqlException("Expecting list or string for property: " + key);
        }
        valueList.add((String)value);
        properties.put(key, valueList);
    }

    private KafkaStreams buildStreams(StreamsBuilder builder, String applicationId, KsqlConfig ksqlConfig, Map<String, Object> overriddenProperties) {
        Map newStreamsProperties = ksqlConfig.getKsqlStreamConfigProps();
        newStreamsProperties.putAll(overriddenProperties);
        newStreamsProperties.put("application.id", applicationId);
        newStreamsProperties.put("auto.offset.reset", ksqlConfig.get("auto.offset.reset"));
        newStreamsProperties.put("commit.interval.ms", ksqlConfig.get("commit.interval.ms"));
        newStreamsProperties.put("cache.max.bytes.buffering", ksqlConfig.get("cache.max.bytes.buffering"));
        if (ksqlConfig.get("ksql.timestamp.column.index") != null) {
            newStreamsProperties.put("ksql.timestamp.column.index", ksqlConfig.get("ksql.timestamp.column.index"));
            newStreamsProperties.put("default.timestamp.extractor", KsqlTimestampExtractor.class);
        }
        this.updateListProperty(newStreamsProperties, StreamsConfig.consumerPrefix((String)"interceptor.classes"), ConsumerCollector.class.getCanonicalName());
        this.updateListProperty(newStreamsProperties, StreamsConfig.producerPrefix((String)"interceptor.classes"), ProducerCollector.class.getCanonicalName());
        return this.kafkaStreamsBuilder.buildKafkaStreams(builder, new StreamsConfig(newStreamsProperties));
    }

    String getServiceId() {
        return "_confluent-ksql-" + this.ksqlConfig.get("ksql.service.id").toString();
    }
}

