/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql;

import com.google.common.collect.ImmutableSet;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.QueryEngine;
import io.confluent.ksql.QueryTerminator;
import io.confluent.ksql.ddl.commands.CommandFactories;
import io.confluent.ksql.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.ddl.commands.CreateTableCommand;
import io.confluent.ksql.ddl.commands.DDLCommandExec;
import io.confluent.ksql.ddl.commands.DDLCommandResult;
import io.confluent.ksql.ddl.commands.DropSourceCommand;
import io.confluent.ksql.ddl.commands.DropTopicCommand;
import io.confluent.ksql.ddl.commands.RegisterTopicCommand;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.parser.tree.AbstractStreamDropStatement;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.DDLStatement;
import io.confluent.ksql.parser.tree.DropStream;
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.DropTopic;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.QualifiedName;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryBody;
import io.confluent.ksql.parser.tree.QuerySpecification;
import io.confluent.ksql.parser.tree.RegisterTopic;
import io.confluent.ksql.parser.tree.Relation;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MaprFSUtils;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.misc.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KsqlEngine
implements Closeable,
QueryTerminator {
    private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class);
    private static final Set<String> IMMUTABLE_PROPERTIES = ImmutableSet.of();
    private KsqlConfig ksqlConfig;
    private final MetaStore metaStore;
    private final KafkaTopicClient topicClient;
    private final DDLCommandExec ddlCommandExec;
    private final QueryEngine queryEngine;
    private final Map<QueryId, PersistentQueryMetadata> persistentQueries;
    private final Set<QueryMetadata> livePersistentQueries;
    private final Set<QueryMetadata> allLiveQueries;
    private final KsqlEngineMetrics engineMetrics;
    private final ScheduledExecutorService aggregateMetricsCollector;
    private final FunctionRegistry functionRegistry;
    private SchemaRegistryClient schemaRegistryClient;

    public KsqlEngine(KsqlConfig ksqlConfig, KafkaTopicClient topicClient) {
        this(ksqlConfig, topicClient, (SchemaRegistryClient)new CachedSchemaRegistryClient((String)ksqlConfig.get("ksql.schema.registry.url"), 1000), (MetaStore)new MetaStoreImpl());
    }

    public KsqlEngine(KsqlConfig ksqlConfig, KafkaTopicClient topicClient, SchemaRegistryClient schemaRegistryClient, MetaStore metaStore) {
        Objects.requireNonNull(ksqlConfig, "ksqlConfig can't be null");
        Objects.requireNonNull(topicClient, "topicClient can't be null");
        Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient can't be null");
        MaprFSUtils.createAppDirAndInternalStreamsIfNotExist(ksqlConfig);
        this.ksqlConfig = ksqlConfig;
        this.metaStore = metaStore;
        this.topicClient = topicClient;
        this.ddlCommandExec = new DDLCommandExec(this.metaStore);
        this.queryEngine = new QueryEngine(this, new CommandFactories(topicClient, this, true));
        this.persistentQueries = new HashMap<QueryId, PersistentQueryMetadata>();
        this.livePersistentQueries = new HashSet<QueryMetadata>();
        this.allLiveQueries = new HashSet<QueryMetadata>();
        this.functionRegistry = new FunctionRegistry();
        this.schemaRegistryClient = schemaRegistryClient;
        this.engineMetrics = new KsqlEngineMetrics("ksql-engine", this);
        this.aggregateMetricsCollector = Executors.newSingleThreadScheduledExecutor();
        this.aggregateMetricsCollector.scheduleAtFixedRate(this.engineMetrics::updateMetrics, 1000L, 1000L, TimeUnit.MILLISECONDS);
    }

    public List<QueryMetadata> buildMultipleQueries(String queriesString, Map<String, Object> overriddenProperties) throws Exception {
        for (String property : overriddenProperties.keySet()) {
            if (!IMMUTABLE_PROPERTIES.contains(property)) continue;
            throw new IllegalArgumentException(String.format("Cannot override property '%s'", property));
        }
        MetaStore tempMetaStore = this.metaStore.clone();
        List<Pair<String, Statement>> queries = this.parseQueries(queriesString, overriddenProperties, tempMetaStore);
        return this.planQueries(queries, overriddenProperties, tempMetaStore);
    }

    private List<QueryMetadata> planQueries(List<Pair<String, Statement>> statementList, Map<String, Object> overriddenProperties, MetaStore tempMetaStore) throws Exception {
        List<Pair<String, PlanNode>> logicalPlans = this.queryEngine.buildLogicalPlans(tempMetaStore, statementList);
        List<QueryMetadata> runningQueries = this.queryEngine.buildPhysicalPlans(logicalPlans, statementList, overriddenProperties, true);
        for (QueryMetadata queryMetadata : runningQueries) {
            if (queryMetadata instanceof PersistentQueryMetadata) {
                this.livePersistentQueries.add(queryMetadata);
                PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata)queryMetadata;
                this.persistentQueries.put(persistentQueryMetadata.getId(), persistentQueryMetadata);
            }
            this.allLiveQueries.add(queryMetadata);
        }
        return runningQueries;
    }

    public QueryMetadata getQueryExecutionPlan(Query query) throws Exception {
        List<Pair<String, PlanNode>> logicalPlans = this.queryEngine.buildLogicalPlans(this.metaStore, Collections.singletonList(new Pair((Object)"", (Object)query)));
        List<QueryMetadata> runningQueries = this.queryEngine.buildPhysicalPlans(logicalPlans, Collections.singletonList(new Pair((Object)"", (Object)query)), Collections.emptyMap(), false);
        return runningQueries.get(0);
    }

    List<Pair<String, Statement>> parseQueries(String queriesString, Map<String, Object> overriddenProperties, MetaStore tempMetaStore) {
        try {
            MetaStore tempMetaStoreForParser = tempMetaStore.clone();
            KsqlParser ksqlParser = new KsqlParser();
            List parsedStatements = ksqlParser.getStatements(queriesString);
            ArrayList<Pair<String, Statement>> queryList = new ArrayList<Pair<String, Statement>>();
            for (SqlBaseParser.SingleStatementContext singleStatementContext : parsedStatements) {
                Pair statementInfo = ksqlParser.prepareStatement(singleStatementContext, tempMetaStoreForParser);
                Statement statement = (Statement)statementInfo.getLeft();
                Pair<String, Statement> queryPair = this.buildSingleQueryAst(statement, KsqlEngine.getStatementString(singleStatementContext), tempMetaStore, tempMetaStoreForParser, overriddenProperties, false);
                if (queryPair == null) continue;
                queryList.add(queryPair);
            }
            return queryList;
        }
        catch (Exception e) {
            throw new ParseFailedException("Parsing failed on KsqlEngine msg:" + e.getMessage(), (Throwable)e);
        }
    }

    private Pair<String, Statement> buildSingleQueryAst(Statement statement, String statementString, MetaStore tempMetaStore, MetaStore tempMetaStoreForParser, Map<String, Object> overriddenProperties, boolean enforceTopicExistence) {
        log.info("Building AST for {}.", (Object)statementString);
        if (statement instanceof Query) {
            return new Pair((Object)statementString, (Object)statement);
        }
        if (statement instanceof CreateStreamAsSelect) {
            CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect)statement;
            QuerySpecification querySpecification = (QuerySpecification)createStreamAsSelect.getQuery().getQueryBody();
            Query query = this.addInto(createStreamAsSelect.getQuery(), querySpecification, createStreamAsSelect.getName().getSuffix(), createStreamAsSelect.getProperties(), createStreamAsSelect.getPartitionByColumn());
            tempMetaStoreForParser.putSource(this.queryEngine.getResultDatasource(querySpecification.getSelect(), createStreamAsSelect.getName().getSuffix()).cloneWithTimeKeyColumns());
            return new Pair((Object)statementString, (Object)query);
        }
        if (statement instanceof CreateTableAsSelect) {
            CreateTableAsSelect createTableAsSelect = (CreateTableAsSelect)statement;
            QuerySpecification querySpecification = (QuerySpecification)createTableAsSelect.getQuery().getQueryBody();
            Query query = this.addInto(createTableAsSelect.getQuery(), querySpecification, createTableAsSelect.getName().getSuffix(), createTableAsSelect.getProperties(), Optional.empty());
            tempMetaStoreForParser.putSource(this.queryEngine.getResultDatasource(querySpecification.getSelect(), createTableAsSelect.getName().getSuffix()).cloneWithTimeKeyColumns());
            return new Pair((Object)statementString, (Object)query);
        }
        if (statement instanceof RegisterTopic) {
            this.ddlCommandExec.tryExecute(new RegisterTopicCommand((RegisterTopic)statement), tempMetaStoreForParser);
            this.ddlCommandExec.tryExecute(new RegisterTopicCommand((RegisterTopic)statement), tempMetaStore);
            return new Pair((Object)statementString, (Object)statement);
        }
        if (statement instanceof CreateStream) {
            this.ddlCommandExec.tryExecute(new CreateStreamCommand(statementString, (CreateStream)statement, overriddenProperties, this.topicClient, enforceTopicExistence), tempMetaStoreForParser);
            this.ddlCommandExec.tryExecute(new CreateStreamCommand(statementString, (CreateStream)statement, overriddenProperties, this.topicClient, enforceTopicExistence), tempMetaStore);
            return new Pair((Object)statementString, (Object)statement);
        }
        if (statement instanceof CreateTable) {
            this.ddlCommandExec.tryExecute(new CreateTableCommand(statementString, (CreateTable)statement, overriddenProperties, this.topicClient, enforceTopicExistence), tempMetaStoreForParser);
            this.ddlCommandExec.tryExecute(new CreateTableCommand(statementString, (CreateTable)statement, overriddenProperties, this.topicClient, enforceTopicExistence), tempMetaStore);
            return new Pair((Object)statementString, (Object)statement);
        }
        if (statement instanceof DropStream) {
            this.ddlCommandExec.tryExecute(new DropSourceCommand((AbstractStreamDropStatement)((DropStream)statement), DataSource.DataSourceType.KSTREAM, this), tempMetaStore);
            this.ddlCommandExec.tryExecute(new DropSourceCommand((AbstractStreamDropStatement)((DropStream)statement), DataSource.DataSourceType.KSTREAM, this), tempMetaStoreForParser);
            return new Pair((Object)statementString, (Object)statement);
        }
        if (statement instanceof DropTable) {
            this.ddlCommandExec.tryExecute(new DropSourceCommand((AbstractStreamDropStatement)((DropTable)statement), DataSource.DataSourceType.KTABLE, this), tempMetaStore);
            this.ddlCommandExec.tryExecute(new DropSourceCommand((AbstractStreamDropStatement)((DropTable)statement), DataSource.DataSourceType.KTABLE, this), tempMetaStoreForParser);
            return new Pair((Object)statementString, (Object)statement);
        }
        if (statement instanceof DropTopic) {
            this.ddlCommandExec.tryExecute(new DropTopicCommand((DropTopic)statement), tempMetaStore);
            this.ddlCommandExec.tryExecute(new DropTopicCommand((DropTopic)statement), tempMetaStoreForParser);
            return new Pair((Object)statementString, (Object)statement);
        }
        if (statement instanceof SetProperty) {
            return new Pair((Object)statementString, (Object)statement);
        }
        return null;
    }

    public static String getStatementString(SqlBaseParser.SingleStatementContext singleStatementContext) {
        CharStream charStream = singleStatementContext.start.getInputStream();
        return charStream.getText(new Interval(singleStatementContext.start.getStartIndex(), singleStatementContext.stop.getStopIndex()));
    }

    public List<Statement> getStatements(String sqlString) {
        return new KsqlParser().buildAst(sqlString, this.metaStore);
    }

    public Query addInto(Query query, QuerySpecification querySpecification, String intoName, Map<String, Expression> intoProperties, Optional<Expression> partitionByExpression) {
        Table intoTable = new Table(QualifiedName.of((String)intoName));
        if (partitionByExpression.isPresent()) {
            HashMap<String, Expression> newIntoProperties = new HashMap<String, Expression>();
            newIntoProperties.putAll(intoProperties);
            newIntoProperties.put("PARTITION_BY", partitionByExpression.get());
            intoTable.setProperties(newIntoProperties);
        } else {
            intoTable.setProperties(intoProperties);
        }
        QuerySpecification newQuerySpecification = new QuerySpecification(querySpecification.getSelect(), (Relation)intoTable, querySpecification.getFrom(), querySpecification.getWindowExpression(), querySpecification.getWhere(), querySpecification.getGroupBy(), querySpecification.getHaving(), querySpecification.getLimit());
        return new Query((QueryBody)newQuerySpecification, query.getLimit());
    }

    public Set<QueryMetadata> getLivePersistentQueries() {
        return this.livePersistentQueries;
    }

    public MetaStore getMetaStore() {
        return this.metaStore;
    }

    public FunctionRegistry getFunctionRegistry() {
        return this.functionRegistry;
    }

    public KafkaTopicClient getTopicClient() {
        return this.topicClient;
    }

    public DDLCommandExec getDDLCommandExec() {
        return this.ddlCommandExec;
    }

    @Override
    public boolean terminateQuery(QueryId queryId, boolean closeStreams) {
        QueryMetadata queryMetadata = this.persistentQueries.remove(queryId);
        if (queryMetadata == null) {
            return false;
        }
        this.livePersistentQueries.remove(queryMetadata);
        this.allLiveQueries.remove(queryMetadata);
        if (closeStreams) {
            queryMetadata.close();
        }
        return true;
    }

    @Override
    public void terminateQueryForEntity(String entity) {
        Optional<PersistentQueryMetadata> query = this.persistentQueries.values().stream().filter(persistentQueryMetadata -> persistentQueryMetadata.getEntity().equalsIgnoreCase(entity)).findFirst();
        if (query.isPresent()) {
            PersistentQueryMetadata metadata = query.get();
            log.info("Terminating persistent query {}", (Object)metadata.getId());
            metadata.close();
            this.persistentQueries.remove(metadata.getId());
            this.livePersistentQueries.remove(metadata);
            this.allLiveQueries.remove(metadata);
        }
    }

    public Map<QueryId, PersistentQueryMetadata> getPersistentQueries() {
        return new HashMap<QueryId, PersistentQueryMetadata>(this.persistentQueries);
    }

    public static List<String> getImmutableProperties() {
        return new ArrayList<String>(IMMUTABLE_PROPERTIES);
    }

    public Map<String, Object> getKsqlConfigProperties() {
        HashMap<String, Object> configProperties = new HashMap<String, Object>();
        configProperties.putAll(this.ksqlConfig.getKsqlConfigProps());
        configProperties.putAll(this.ksqlConfig.getKsqlStreamConfigProps());
        return configProperties;
    }

    public KsqlConfig getKsqlConfig() {
        return this.ksqlConfig;
    }

    public long numberOfLiveQueries() {
        return this.allLiveQueries.size();
    }

    public long numberOfPersistentQueries() {
        return this.livePersistentQueries.size();
    }

    @Override
    public void close() {
        for (QueryMetadata queryMetadata : this.livePersistentQueries) {
            queryMetadata.close();
        }
        this.topicClient.close();
        this.engineMetrics.close();
        this.aggregateMetricsCollector.shutdown();
    }

    @Override
    public boolean terminateAllQueries() {
        try {
            for (QueryMetadata queryMetadata : this.livePersistentQueries) {
                if (queryMetadata instanceof PersistentQueryMetadata) {
                    PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata)queryMetadata;
                    persistentQueryMetadata.close();
                }
                this.allLiveQueries.remove(queryMetadata);
            }
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    public void removeTemporaryQuery(QueryMetadata queryMetadata) {
        this.allLiveQueries.remove(queryMetadata);
    }

    public DDLCommandResult executeDdlStatement(String sqlExpression, DDLStatement statement, Map<String, Object> overriddenProperties) {
        return this.queryEngine.handleDdlStatement(sqlExpression, statement, overriddenProperties);
    }

    public SchemaRegistryClient getSchemaRegistryClient() {
        if (this.schemaRegistryClient != null) {
            return this.schemaRegistryClient;
        }
        throw new KsqlException("Cannot access the Schema Registry. Schema Registry client is null.");
    }

    public List<QueryMetadata> createQueries(String queries) throws Exception {
        MetaStore metaStoreCopy = this.metaStore.clone();
        return this.planQueries(this.parseQueries(queries, Collections.emptyMap(), metaStoreCopy), new HashMap<String, Object>(), metaStoreCopy);
    }
}

