package io.confluent.ksql;

import com.google.common.collect.ImmutableSet;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.ddl.commands.CommandFactories;
import io.confluent.ksql.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.ddl.commands.CreateTableCommand;
import io.confluent.ksql.ddl.commands.DDLCommandExec;
import io.confluent.ksql.ddl.commands.DDLCommandResult;
import io.confluent.ksql.ddl.commands.DropSourceCommand;
import io.confluent.ksql.ddl.commands.DropTopicCommand;
import io.confluent.ksql.ddl.commands.RegisterTopicCommand;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.SqlBaseParser;
import io.confluent.ksql.parser.exception.ParseFailedException;
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.DDLStatement;
import io.confluent.ksql.parser.tree.DropStream;
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.DropTopic;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.QualifiedName;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QuerySpecification;
import io.confluent.ksql.parser.tree.RegisterTopic;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.parser.tree.Table;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MaprFSUtils;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.antlr.v4.runtime.misc.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/KsqlEngine.class */
public class KsqlEngine implements Closeable, QueryTerminator {
    private static final Logger log = LoggerFactory.getLogger(KsqlEngine.class);
    private static final Set<String> IMMUTABLE_PROPERTIES = ImmutableSet.of();
    private KsqlConfig ksqlConfig;
    private final MetaStore metaStore;
    private final KafkaTopicClient topicClient;
    private final DDLCommandExec ddlCommandExec;
    private final QueryEngine queryEngine;
    private final Map<QueryId, PersistentQueryMetadata> persistentQueries;
    private final Set<QueryMetadata> livePersistentQueries;
    private final Set<QueryMetadata> allLiveQueries;
    private final KsqlEngineMetrics engineMetrics;
    private final ScheduledExecutorService aggregateMetricsCollector;
    private final FunctionRegistry functionRegistry;
    private SchemaRegistryClient schemaRegistryClient;

    public KsqlEngine(KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient) {
        this(ksqlConfig, kafkaTopicClient, new CachedSchemaRegistryClient((String) ksqlConfig.get("ksql.schema.registry.url"), 1000), new MetaStoreImpl());
    }

    public KsqlEngine(KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, SchemaRegistryClient schemaRegistryClient, MetaStore metaStore) {
        Objects.requireNonNull(ksqlConfig, "ksqlConfig can't be null");
        Objects.requireNonNull(kafkaTopicClient, "topicClient can't be null");
        Objects.requireNonNull(schemaRegistryClient, "schemaRegistryClient can't be null");
        MaprFSUtils.createAppDirAndInternalStreamsIfNotExist(ksqlConfig);
        this.ksqlConfig = ksqlConfig;
        this.metaStore = metaStore;
        this.topicClient = kafkaTopicClient;
        this.ddlCommandExec = new DDLCommandExec(this.metaStore);
        this.queryEngine = new QueryEngine(this, new CommandFactories(kafkaTopicClient, this, true));
        this.persistentQueries = new HashMap();
        this.livePersistentQueries = new HashSet();
        this.allLiveQueries = new HashSet();
        this.functionRegistry = new FunctionRegistry();
        this.schemaRegistryClient = schemaRegistryClient;
        this.engineMetrics = new KsqlEngineMetrics("ksql-engine", this);
        this.aggregateMetricsCollector = Executors.newSingleThreadScheduledExecutor();
        ScheduledExecutorService scheduledExecutorService = this.aggregateMetricsCollector;
        KsqlEngineMetrics ksqlEngineMetrics = this.engineMetrics;
        ksqlEngineMetrics.getClass();
        scheduledExecutorService.scheduleAtFixedRate(ksqlEngineMetrics::updateMetrics, 1000L, 1000L, TimeUnit.MILLISECONDS);
    }

    public List<QueryMetadata> buildMultipleQueries(String str, Map<String, Object> map) throws Exception {
        for (String str2 : map.keySet()) {
            if (IMMUTABLE_PROPERTIES.contains(str2)) {
                throw new IllegalArgumentException(String.format("Cannot override property '%s'", str2));
            }
        }
        MetaStore clone = this.metaStore.clone();
        return planQueries(parseQueries(str, map, clone), map, clone);
    }

    private List<QueryMetadata> planQueries(List<Pair<String, Statement>> list, Map<String, Object> map, MetaStore metaStore) throws Exception {
        List<QueryMetadata> buildPhysicalPlans = this.queryEngine.buildPhysicalPlans(this.queryEngine.buildLogicalPlans(metaStore, list), list, map, true);
        for (QueryMetadata queryMetadata : buildPhysicalPlans) {
            if (queryMetadata instanceof PersistentQueryMetadata) {
                this.livePersistentQueries.add(queryMetadata);
                PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
                this.persistentQueries.put(persistentQueryMetadata.getId(), persistentQueryMetadata);
            }
            this.allLiveQueries.add(queryMetadata);
        }
        return buildPhysicalPlans;
    }

    public QueryMetadata getQueryExecutionPlan(Query query) throws Exception {
        return this.queryEngine.buildPhysicalPlans(this.queryEngine.buildLogicalPlans(this.metaStore, Collections.singletonList(new Pair("", query))), Collections.singletonList(new Pair("", query)), Collections.emptyMap(), false).get(0);
    }

    List<Pair<String, Statement>> parseQueries(String str, Map<String, Object> map, MetaStore metaStore) {
        try {
            MetaStore clone = metaStore.clone();
            KsqlParser ksqlParser = new KsqlParser();
            List<SqlBaseParser.SingleStatementContext> statements = ksqlParser.getStatements(str);
            ArrayList arrayList = new ArrayList();
            for (SqlBaseParser.SingleStatementContext singleStatementContext : statements) {
                Pair<String, Statement> buildSingleQueryAst = buildSingleQueryAst((Statement) ksqlParser.prepareStatement(singleStatementContext, clone).getLeft(), getStatementString(singleStatementContext), metaStore, clone, map, false);
                if (buildSingleQueryAst != null) {
                    arrayList.add(buildSingleQueryAst);
                }
            }
            return arrayList;
        } catch (Exception e) {
            throw new ParseFailedException("Parsing failed on KsqlEngine msg:" + e.getMessage(), e);
        }
    }

    private Pair<String, Statement> buildSingleQueryAst(Statement statement, String str, MetaStore metaStore, MetaStore metaStore2, Map<String, Object> map, boolean z) {
        log.info("Building AST for {}.", str);
        if (statement instanceof Query) {
            return new Pair<>(str, statement);
        }
        if (statement instanceof CreateStreamAsSelect) {
            CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statement;
            QuerySpecification querySpecification = (QuerySpecification) createStreamAsSelect.getQuery().getQueryBody();
            Query addInto = addInto(createStreamAsSelect.getQuery(), querySpecification, createStreamAsSelect.getName().getSuffix(), createStreamAsSelect.getProperties(), createStreamAsSelect.getPartitionByColumn());
            metaStore2.putSource(this.queryEngine.getResultDatasource(querySpecification.getSelect(), createStreamAsSelect.getName().getSuffix()).cloneWithTimeKeyColumns());
            return new Pair<>(str, addInto);
        }
        if (statement instanceof CreateTableAsSelect) {
            CreateTableAsSelect createTableAsSelect = (CreateTableAsSelect) statement;
            QuerySpecification querySpecification2 = (QuerySpecification) createTableAsSelect.getQuery().getQueryBody();
            Query addInto2 = addInto(createTableAsSelect.getQuery(), querySpecification2, createTableAsSelect.getName().getSuffix(), createTableAsSelect.getProperties(), Optional.empty());
            metaStore2.putSource(this.queryEngine.getResultDatasource(querySpecification2.getSelect(), createTableAsSelect.getName().getSuffix()).cloneWithTimeKeyColumns());
            return new Pair<>(str, addInto2);
        }
        if (statement instanceof RegisterTopic) {
            this.ddlCommandExec.tryExecute(new RegisterTopicCommand((RegisterTopic) statement), metaStore2);
            this.ddlCommandExec.tryExecute(new RegisterTopicCommand((RegisterTopic) statement), metaStore);
            return new Pair<>(str, statement);
        }
        if (statement instanceof CreateStream) {
            this.ddlCommandExec.tryExecute(new CreateStreamCommand(str, (CreateStream) statement, map, this.topicClient, z), metaStore2);
            this.ddlCommandExec.tryExecute(new CreateStreamCommand(str, (CreateStream) statement, map, this.topicClient, z), metaStore);
            return new Pair<>(str, statement);
        }
        if (statement instanceof CreateTable) {
            this.ddlCommandExec.tryExecute(new CreateTableCommand(str, (CreateTable) statement, map, this.topicClient, z), metaStore2);
            this.ddlCommandExec.tryExecute(new CreateTableCommand(str, (CreateTable) statement, map, this.topicClient, z), metaStore);
            return new Pair<>(str, statement);
        }
        if (statement instanceof DropStream) {
            this.ddlCommandExec.tryExecute(new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM, this), metaStore);
            this.ddlCommandExec.tryExecute(new DropSourceCommand((DropStream) statement, DataSource.DataSourceType.KSTREAM, this), metaStore2);
            return new Pair<>(str, statement);
        }
        if (statement instanceof DropTable) {
            this.ddlCommandExec.tryExecute(new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE, this), metaStore);
            this.ddlCommandExec.tryExecute(new DropSourceCommand((DropTable) statement, DataSource.DataSourceType.KTABLE, this), metaStore2);
            return new Pair<>(str, statement);
        }
        if (statement instanceof DropTopic) {
            this.ddlCommandExec.tryExecute(new DropTopicCommand((DropTopic) statement), metaStore);
            this.ddlCommandExec.tryExecute(new DropTopicCommand((DropTopic) statement), metaStore2);
            return new Pair<>(str, statement);
        }
        if (statement instanceof SetProperty) {
            return new Pair<>(str, statement);
        }
        return null;
    }

    public static String getStatementString(SqlBaseParser.SingleStatementContext singleStatementContext) {
        return singleStatementContext.start.getInputStream().getText(new Interval(singleStatementContext.start.getStartIndex(), singleStatementContext.stop.getStopIndex()));
    }

    public List<Statement> getStatements(String str) {
        return new KsqlParser().buildAst(str, this.metaStore);
    }

    public Query addInto(Query query, QuerySpecification querySpecification, String str, Map<String, Expression> map, Optional<Expression> optional) {
        Table table = new Table(QualifiedName.of(str));
        if (optional.isPresent()) {
            HashMap hashMap = new HashMap();
            hashMap.putAll(map);
            hashMap.put("PARTITION_BY", optional.get());
            table.setProperties(hashMap);
        } else {
            table.setProperties(map);
        }
        return new Query(new QuerySpecification(querySpecification.getSelect(), table, querySpecification.getFrom(), querySpecification.getWindowExpression(), querySpecification.getWhere(), querySpecification.getGroupBy(), querySpecification.getHaving(), querySpecification.getLimit()), query.getLimit());
    }

    public Set<QueryMetadata> getLivePersistentQueries() {
        return this.livePersistentQueries;
    }

    public MetaStore getMetaStore() {
        return this.metaStore;
    }

    public FunctionRegistry getFunctionRegistry() {
        return this.functionRegistry;
    }

    public KafkaTopicClient getTopicClient() {
        return this.topicClient;
    }

    public DDLCommandExec getDDLCommandExec() {
        return this.ddlCommandExec;
    }

    @Override // io.confluent.ksql.QueryTerminator
    public boolean terminateQuery(QueryId queryId, boolean z) {
        PersistentQueryMetadata remove = this.persistentQueries.remove(queryId);
        if (remove == null) {
            return false;
        }
        this.livePersistentQueries.remove(remove);
        this.allLiveQueries.remove(remove);
        if (!z) {
            return true;
        }
        remove.close();
        return true;
    }

    @Override // io.confluent.ksql.QueryTerminator
    public void terminateQueryForEntity(String str) {
        Optional<PersistentQueryMetadata> findFirst = this.persistentQueries.values().stream().filter(persistentQueryMetadata -> {
            return persistentQueryMetadata.getEntity().equalsIgnoreCase(str);
        }).findFirst();
        if (findFirst.isPresent()) {
            PersistentQueryMetadata persistentQueryMetadata2 = findFirst.get();
            log.info("Terminating persistent query {}", persistentQueryMetadata2.getId());
            persistentQueryMetadata2.close();
            this.persistentQueries.remove(persistentQueryMetadata2.getId());
            this.livePersistentQueries.remove(persistentQueryMetadata2);
            this.allLiveQueries.remove(persistentQueryMetadata2);
        }
    }

    public Map<QueryId, PersistentQueryMetadata> getPersistentQueries() {
        return new HashMap(this.persistentQueries);
    }

    public static List<String> getImmutableProperties() {
        return new ArrayList(IMMUTABLE_PROPERTIES);
    }

    public Map<String, Object> getKsqlConfigProperties() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.ksqlConfig.getKsqlConfigProps());
        hashMap.putAll(this.ksqlConfig.getKsqlStreamConfigProps());
        return hashMap;
    }

    public KsqlConfig getKsqlConfig() {
        return this.ksqlConfig;
    }

    public long numberOfLiveQueries() {
        return this.allLiveQueries.size();
    }

    public long numberOfPersistentQueries() {
        return this.livePersistentQueries.size();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        Iterator<QueryMetadata> it = this.livePersistentQueries.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.topicClient.close();
        this.engineMetrics.close();
        this.aggregateMetricsCollector.shutdown();
    }

    @Override // io.confluent.ksql.QueryTerminator
    public boolean terminateAllQueries() {
        try {
            for (QueryMetadata queryMetadata : this.livePersistentQueries) {
                if (queryMetadata instanceof PersistentQueryMetadata) {
                    ((PersistentQueryMetadata) queryMetadata).close();
                }
                this.allLiveQueries.remove(queryMetadata);
            }
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    public void removeTemporaryQuery(QueryMetadata queryMetadata) {
        this.allLiveQueries.remove(queryMetadata);
    }

    public DDLCommandResult executeDdlStatement(String str, DDLStatement dDLStatement, Map<String, Object> map) {
        return this.queryEngine.handleDdlStatement(str, dDLStatement, map);
    }

    public SchemaRegistryClient getSchemaRegistryClient() {
        if (this.schemaRegistryClient != null) {
            return this.schemaRegistryClient;
        }
        throw new KsqlException("Cannot access the Schema Registry. Schema Registry client is null.");
    }

    public List<QueryMetadata> createQueries(String str) throws Exception {
        MetaStore clone = this.metaStore.clone();
        return planQueries(parseQueries(str, Collections.emptyMap(), clone), new HashMap(), clone);
    }
}
