/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.function.KsqlAggregateFunction;
import io.confluent.ksql.function.udaf.KudafAggregator;
import io.confluent.ksql.function.udaf.KudafInitializer;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.parser.tree.FunctionCall;
import io.confluent.ksql.parser.tree.WindowExpression;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.planner.plan.PlanVisitor;
import io.confluent.ksql.planner.plan.StructuredDataSourceNode;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.structured.SchemaKGroupedStream;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsBuilder;

public class AggregateNode
extends PlanNode {
    private final PlanNode source;
    private final Schema schema;
    private final List<Expression> groupByExpressions;
    private final WindowExpression windowExpression;
    private final List<Expression> aggregateFunctionArguments;
    private final List<FunctionCall> functionList;
    private final List<Expression> requiredColumnList;
    private final List<Expression> finalSelectExpressions;
    private final Expression havingExpressions;

    @JsonCreator
    public AggregateNode(@JsonProperty(value="id") PlanNodeId id, @JsonProperty(value="source") PlanNode source, @JsonProperty(value="schema") Schema schema, @JsonProperty(value="groupby") List<Expression> groupByExpressions, @JsonProperty(value="window") WindowExpression windowExpression, @JsonProperty(value="aggregateFunctionArguments") List<Expression> aggregateFunctionArguments, @JsonProperty(value="functionList") List<FunctionCall> functionList, @JsonProperty(value="requiredColumnList") List<Expression> requiredColumnList, @JsonProperty(value="finalSelectExpressions") List<Expression> finalSelectExpressions, @JsonProperty(value="havingExpressions") Expression havingExpressions) {
        super(id);
        this.source = source;
        this.schema = schema;
        this.groupByExpressions = groupByExpressions;
        this.windowExpression = windowExpression;
        this.aggregateFunctionArguments = aggregateFunctionArguments;
        this.functionList = functionList;
        this.requiredColumnList = requiredColumnList;
        this.finalSelectExpressions = finalSelectExpressions;
        this.havingExpressions = havingExpressions;
    }

    @Override
    public Schema getSchema() {
        return this.schema;
    }

    @Override
    public Field getKeyField() {
        return null;
    }

    @Override
    public List<PlanNode> getSources() {
        return ImmutableList.of((Object)this.source);
    }

    public PlanNode getSource() {
        return this.source;
    }

    public List<Expression> getGroupByExpressions() {
        return this.groupByExpressions;
    }

    public WindowExpression getWindowExpression() {
        return this.windowExpression;
    }

    public List<Expression> getAggregateFunctionArguments() {
        return this.aggregateFunctionArguments;
    }

    public List<FunctionCall> getFunctionList() {
        return this.functionList;
    }

    public List<Expression> getRequiredColumnList() {
        return this.requiredColumnList;
    }

    private List<Pair<String, Expression>> getFinalSelectExpressions() {
        ArrayList<Pair<String, Expression>> finalSelectExpressionList = new ArrayList<Pair<String, Expression>>();
        if (this.finalSelectExpressions.size() != this.schema.fields().size()) {
            throw new KsqlException("Incompatible aggregate schema, field count must match, selected field count:" + this.finalSelectExpressions.size() + " schema field count:" + this.schema.fields().size());
        }
        for (int i = 0; i < this.finalSelectExpressions.size(); ++i) {
            finalSelectExpressionList.add((Pair<String, Expression>)new Pair((Object)((Field)this.schema.fields().get(i)).name(), (Object)this.finalSelectExpressions.get(i)));
        }
        return finalSelectExpressionList;
    }

    public Expression getHavingExpressions() {
        return this.havingExpressions;
    }

    @Override
    public <C, R> R accept(PlanVisitor<C, R> visitor, C context) {
        return visitor.visitAggregate(this, context);
    }

    @Override
    public SchemaKStream buildStream(StreamsBuilder builder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> props, SchemaRegistryClient schemaRegistryClient) {
        StructuredDataSourceNode streamSourceNode = this.getTheSourceNode();
        SchemaKStream sourceSchemaKStream = this.getSource().buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, props, schemaRegistryClient);
        ArrayList<Pair<String, Expression>> aggArgExpansionList = new ArrayList<Pair<String, Expression>>();
        HashMap<String, Integer> expressionNames = new HashMap<String, Integer>();
        this.collectAggregateArgExpressions(this.getRequiredColumnList(), aggArgExpansionList, expressionNames);
        this.collectAggregateArgExpressions(this.getAggregateFunctionArguments(), aggArgExpansionList, expressionNames);
        SchemaKStream aggregateArgExpanded = sourceSchemaKStream.select(aggArgExpansionList);
        KsqlTopicSerDe ksqlTopicSerDe = streamSourceNode.getStructuredDataSource().getKsqlTopic().getKsqlTopicSerDe();
        Serde genericRowSerde = ksqlTopicSerDe.getGenericRowSerde(aggregateArgExpanded.getSchema(), ksqlConfig, true, schemaRegistryClient);
        SchemaKGroupedStream schemaKGroupedStream = aggregateArgExpanded.groupBy((Serde<String>)Serdes.String(), (Serde<GenericRow>)genericRowSerde, this.getGroupByExpressions());
        SchemaBuilder aggregateSchema = SchemaBuilder.struct();
        Map<Integer, Integer> aggValToValColumnMap = this.createAggregateValueToValueColumnMap(aggregateArgExpanded, aggregateSchema);
        Schema aggStageSchema = this.buildAggregateSchema(aggregateArgExpanded.getSchema(), functionRegistry);
        Serde aggValueGenericRowSerde = ksqlTopicSerDe.getGenericRowSerde(aggStageSchema, ksqlConfig, true, schemaRegistryClient);
        KudafInitializer initializer = new KudafInitializer(aggValToValColumnMap.size());
        SchemaKTable schemaKTable = schemaKGroupedStream.aggregate(initializer, new KudafAggregator(this.createAggValToFunctionMap(expressionNames, aggregateArgExpanded, aggregateSchema, initializer, aggValToValColumnMap.size(), functionRegistry), aggValToValColumnMap), this.getWindowExpression(), (Serde<GenericRow>)aggValueGenericRowSerde);
        SchemaKTable result = new SchemaKTable(aggStageSchema, schemaKTable.getKtable(), schemaKTable.getKeyField(), schemaKTable.getSourceSchemaKStreams(), schemaKTable.isWindowed(), SchemaKStream.Type.AGGREGATE, functionRegistry, schemaRegistryClient);
        if (this.getHavingExpressions() != null) {
            result = result.filter(this.getHavingExpressions());
        }
        return result.select((List)this.getFinalSelectExpressions());
    }

    private Map<Integer, Integer> createAggregateValueToValueColumnMap(SchemaKStream aggregateArgExpanded, SchemaBuilder aggregateSchema) {
        HashMap<Integer, Integer> aggValToValColumnMap = new HashMap<Integer, Integer>();
        int nonAggColumnIndex = 0;
        for (Expression expression : this.getRequiredColumnList()) {
            String exprStr = expression.toString();
            int index = SchemaUtil.getIndexInSchema((String)exprStr, (Schema)aggregateArgExpanded.getSchema());
            aggValToValColumnMap.put(nonAggColumnIndex, index);
            ++nonAggColumnIndex;
            Field field = (Field)aggregateArgExpanded.getSchema().fields().get(index);
            aggregateSchema.field(field.name(), field.schema());
        }
        return aggValToValColumnMap;
    }

    private void collectAggregateArgExpressions(List<Expression> expressions, List<Pair<String, Expression>> aggArgExpansionList, Map<String, Integer> expressionNames) {
        expressions.stream().filter(e -> !expressionNames.containsKey(e.toString())).forEach(expression -> {
            expressionNames.put(expression.toString(), aggArgExpansionList.size());
            aggArgExpansionList.add(new Pair((Object)expression.toString(), expression));
        });
    }

    private Map<Integer, KsqlAggregateFunction> createAggValToFunctionMap(Map<String, Integer> expressionNames, SchemaKStream aggregateArgExpanded, SchemaBuilder aggregateSchema, KudafInitializer initializer, int initialUdafIndex, FunctionRegistry functionRegistry) {
        try {
            int udafIndexInAggSchema = initialUdafIndex;
            HashMap<Integer, KsqlAggregateFunction> aggValToAggFunctionMap = new HashMap<Integer, KsqlAggregateFunction>();
            for (FunctionCall functionCall : this.getFunctionList()) {
                KsqlAggregateFunction aggregateFunctionInfo = functionRegistry.getAggregateFunction(functionCall.getName().toString(), functionCall.getArguments(), aggregateArgExpanded.getSchema());
                KsqlAggregateFunction aggregateFunction = aggregateFunctionInfo.getInstance(expressionNames, functionCall.getArguments());
                aggValToAggFunctionMap.put(udafIndexInAggSchema++, aggregateFunction);
                initializer.addAggregateIntializer(aggregateFunction.getInitialValueSupplier());
                aggregateSchema.field("AGG_COL_" + udafIndexInAggSchema, aggregateFunction.getReturnType());
            }
            return aggValToAggFunctionMap;
        }
        catch (Exception e) {
            throw new KsqlException(String.format("Failed to create aggregate val to function map. expressionNames:%s", expressionNames), (Throwable)e);
        }
    }

    private Schema buildAggregateSchema(Schema schema, FunctionRegistry functionRegistry) {
        SchemaBuilder schemaBuilder = SchemaBuilder.struct();
        List fields = schema.fields();
        for (int i = 0; i < this.getRequiredColumnList().size(); ++i) {
            schemaBuilder.field(((Field)fields.get(i)).name(), ((Field)fields.get(i)).schema());
        }
        for (int aggFunctionVarSuffix = 0; aggFunctionVarSuffix < this.getFunctionList().size(); ++aggFunctionVarSuffix) {
            String udafName = this.getFunctionList().get(aggFunctionVarSuffix).getName().getSuffix();
            KsqlAggregateFunction aggregateFunction = functionRegistry.getAggregateFunction(udafName, this.getFunctionList().get(aggFunctionVarSuffix).getArguments(), schema);
            schemaBuilder.field("KSQL_AGG_VARIABLE_" + aggFunctionVarSuffix, aggregateFunction.getReturnType());
        }
        return schemaBuilder.build();
    }
}

