/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.structured;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.codegen.CodeGenRunner;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.parser.tree.DereferenceExpression;
import io.confluent.ksql.parser.tree.Expression;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.structured.QueuedSchemaKStream;
import io.confluent.ksql.structured.SchemaKGroupedStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.structured.SelectValueMapper;
import io.confluent.ksql.structured.SqlPredicate;
import io.confluent.ksql.util.ExpressionMetadata;
import io.confluent.ksql.util.GenericRowValueTypeEnforcer;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapper;

public class SchemaKStream {
    protected final Schema schema;
    protected final KStream<String, GenericRow> kstream;
    final Field keyField;
    final List<SchemaKStream> sourceSchemaKStreams;
    private final GenericRowValueTypeEnforcer genericRowValueTypeEnforcer;
    protected final Type type;
    protected final FunctionRegistry functionRegistry;
    private OutputNode output;
    protected final SchemaRegistryClient schemaRegistryClient;

    public SchemaKStream(Schema schema, KStream<String, GenericRow> kstream, Field keyField, List<SchemaKStream> sourceSchemaKStreams, Type type, FunctionRegistry functionRegistry, SchemaRegistryClient schemaRegistryClient) {
        this.schema = schema;
        this.kstream = kstream;
        this.keyField = keyField;
        this.sourceSchemaKStreams = sourceSchemaKStreams;
        this.genericRowValueTypeEnforcer = new GenericRowValueTypeEnforcer(schema);
        this.type = type;
        this.functionRegistry = functionRegistry;
        this.schemaRegistryClient = schemaRegistryClient;
    }

    public QueuedSchemaKStream toQueue(Optional<Integer> limit) {
        return new QueuedSchemaKStream(this, limit);
    }

    public SchemaKStream into(String kafkaTopicName, Serde<GenericRow> topicValueSerDe, Set<Integer> rowkeyIndexes) {
        this.kstream.mapValues(row -> {
            if (row == null) {
                return null;
            }
            ArrayList columns = new ArrayList();
            for (int i = 0; i < row.getColumns().size(); ++i) {
                if (rowkeyIndexes.contains(i)) continue;
                columns.add(row.getColumns().get(i));
            }
            return new GenericRow(columns);
        }).to(kafkaTopicName, Produced.with((Serde)Serdes.String(), topicValueSerDe));
        return this;
    }

    public SchemaKStream filter(Expression filterExpression) {
        SqlPredicate predicate = new SqlPredicate(filterExpression, this.schema, false, this.functionRegistry);
        KStream filteredKStream = this.kstream.filter(predicate.getPredicate());
        return new SchemaKStream(this.schema, (KStream<String, GenericRow>)filteredKStream, this.keyField, Arrays.asList(this), Type.FILTER, this.functionRegistry, this.schemaRegistryClient);
    }

    public SchemaKStream select(Schema selectSchema) {
        KStream projectedKStream = this.kstream.mapValues(row -> {
            ArrayList<Object> newColumns = new ArrayList<Object>();
            for (Field schemaField : selectSchema.fields()) {
                newColumns.add(this.extractColumn(schemaField, (GenericRow)row));
            }
            return new GenericRow(newColumns);
        });
        return new SchemaKStream(selectSchema, (KStream<String, GenericRow>)projectedKStream, this.keyField, Collections.singletonList(this), Type.PROJECT, this.functionRegistry, this.schemaRegistryClient);
    }

    public SchemaKStream select(List<Pair<String, Expression>> expressionPairList) {
        Pair<Schema, SelectValueMapper> schemaAndMapper = this.createSelectValueMapperAndSchema(expressionPairList);
        return new SchemaKStream((Schema)schemaAndMapper.left, (KStream<String, GenericRow>)this.kstream.mapValues((ValueMapper)schemaAndMapper.right), this.keyField, Collections.singletonList(this), Type.PROJECT, this.functionRegistry, this.schemaRegistryClient);
    }

    Pair<Schema, SelectValueMapper> createSelectValueMapperAndSchema(List<Pair<String, Expression>> expressionPairList) {
        try {
            CodeGenRunner codeGenRunner = new CodeGenRunner(this.schema, this.functionRegistry);
            SchemaBuilder schemaBuilder = SchemaBuilder.struct();
            ArrayList<ExpressionMetadata> expressionEvaluators = new ArrayList<ExpressionMetadata>();
            for (Pair<String, Expression> expressionPair : expressionPairList) {
                ExpressionMetadata expressionEvaluator = codeGenRunner.buildCodeGenFromParseTree((Expression)expressionPair.getRight());
                schemaBuilder.field((String)expressionPair.getLeft(), expressionEvaluator.getExpressionType());
                expressionEvaluators.add(expressionEvaluator);
            }
            return new Pair((Object)schemaBuilder.build(), (Object)new SelectValueMapper(this.genericRowValueTypeEnforcer, expressionPairList, expressionEvaluators));
        }
        catch (Exception e) {
            throw new KsqlException("Code generation failed for SelectValueMapper", (Throwable)e);
        }
    }

    public SchemaKStream leftJoin(SchemaKTable schemaKTable, Schema joinSchema, Field joinKey, KsqlTopicSerDe joinSerDe, KsqlConfig ksqlConfig) {
        KStream joinedKStream = this.kstream.leftJoin(schemaKTable.getKtable(), (leftGenericRow, rightGenericRow) -> {
            ArrayList columns = new ArrayList(leftGenericRow.getColumns());
            if (rightGenericRow == null) {
                for (int i = leftGenericRow.getColumns().size(); i < joinSchema.fields().size(); ++i) {
                    columns.add(null);
                }
            } else {
                columns.addAll(rightGenericRow.getColumns());
            }
            return new GenericRow(columns);
        }, Joined.with((Serde)Serdes.String(), (Serde)joinSerDe.getGenericRowSerde(this.getSchema(), ksqlConfig, false, this.schemaRegistryClient), null));
        return new SchemaKStream(joinSchema, (KStream<String, GenericRow>)joinedKStream, joinKey, Arrays.asList(this, schemaKTable), Type.JOIN, this.functionRegistry, this.schemaRegistryClient);
    }

    public SchemaKStream selectKey(Field newKeyField, boolean updateRowKey) {
        if (this.keyField != null && this.keyField.name().equals(newKeyField.name())) {
            return this;
        }
        KStream keyedKStream = this.kstream.filter((key, value) -> value != null && this.extractColumn(newKeyField, (GenericRow)value) != null).selectKey((key, value) -> this.extractColumn(newKeyField, (GenericRow)value).toString()).mapValues((key, row) -> {
            if (updateRowKey) {
                row.getColumns().set(1, key);
            }
            return row;
        });
        return new SchemaKStream(this.schema, (KStream<String, GenericRow>)keyedKStream, newKeyField, Collections.singletonList(this), Type.REKEY, this.functionRegistry, this.schemaRegistryClient);
    }

    private Object extractColumn(Field newKeyField, GenericRow value) {
        return value.getColumns().get(SchemaUtil.getFieldIndexByName((Schema)this.schema, (String)newKeyField.name()));
    }

    private String fieldNameFromExpression(Expression expression) {
        if (expression instanceof DereferenceExpression) {
            DereferenceExpression dereferenceExpression = (DereferenceExpression)expression;
            return dereferenceExpression.getFieldName();
        }
        return null;
    }

    private boolean rekeyRequired(List<Expression> groupByExpressions) {
        Field keyField = this.getKeyField();
        if (keyField == null) {
            return true;
        }
        String keyFieldName = SchemaUtil.getFieldNameWithNoAlias((Field)keyField);
        return groupByExpressions.size() != 1 || !this.fieldNameFromExpression(groupByExpressions.get(0)).equals(keyFieldName);
    }

    public SchemaKGroupedStream groupBy(Serde<String> keySerde, Serde<GenericRow> valSerde, List<Expression> groupByExpressions) {
        boolean rekey = this.rekeyRequired(groupByExpressions);
        if (!rekey) {
            KGroupedStream kgroupedStream = this.kstream.groupByKey(Serialized.with(keySerde, valSerde));
            return new SchemaKGroupedStream(this.schema, kgroupedStream, this.keyField, Collections.singletonList(this), this.functionRegistry, this.schemaRegistryClient);
        }
        StringBuilder aggregateKeyName = new StringBuilder();
        ArrayList<Integer> newKeyIndexes = new ArrayList<Integer>();
        boolean addSeparator = false;
        for (Expression groupByExpr : groupByExpressions) {
            if (addSeparator) {
                aggregateKeyName.append("|+|");
            } else {
                addSeparator = true;
            }
            aggregateKeyName.append(groupByExpr.toString());
            newKeyIndexes.add(SchemaUtil.getIndexInSchema((String)groupByExpr.toString(), (Schema)this.getSchema()));
        }
        KGroupedStream kgroupedStream = this.kstream.filter((key, value) -> value != null).groupBy((key, value) -> {
            StringBuilder newKey = new StringBuilder();
            boolean addSeparator1 = false;
            Iterator iterator = newKeyIndexes.iterator();
            while (iterator.hasNext()) {
                int index = (Integer)iterator.next();
                if (addSeparator1) {
                    newKey.append("|+|");
                } else {
                    addSeparator1 = true;
                }
                newKey.append(String.valueOf(value.getColumns().get(index)));
            }
            return newKey.toString();
        }, Serialized.with(keySerde, valSerde));
        Field newKeyField = new Field(aggregateKeyName.toString(), -1, Schema.STRING_SCHEMA);
        return new SchemaKGroupedStream(this.schema, kgroupedStream, newKeyField, Collections.singletonList(this), this.functionRegistry, this.schemaRegistryClient);
    }

    public Field getKeyField() {
        return this.keyField;
    }

    public Schema getSchema() {
        return this.schema;
    }

    public KStream getKstream() {
        return this.kstream;
    }

    public List<SchemaKStream> getSourceSchemaKStreams() {
        return this.sourceSchemaKStreams;
    }

    public String getExecutionPlan(String indent) {
        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append(indent).append(" > [ ").append((Object)this.type).append(" ] Schema: ").append(SchemaUtil.getSchemaDefinitionString((Schema)this.schema)).append(".\n");
        for (SchemaKStream schemaKStream : this.sourceSchemaKStreams) {
            stringBuilder.append("\t").append(indent).append(schemaKStream.getExecutionPlan(indent + "\t"));
        }
        return stringBuilder.toString();
    }

    public OutputNode outputNode() {
        return this.output;
    }

    public void setOutputNode(OutputNode output) {
        this.output = output;
    }

    public static enum Type {
        SOURCE,
        PROJECT,
        FILTER,
        AGGREGATE,
        SINK,
        REKEY,
        JOIN,
        TOSTREAM;

    }
}

