/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.planner.plan.StructuredDataSourceNode;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsBuilder;

public class JoinNode
extends PlanNode {
    private final Type type;
    private final PlanNode left;
    private final PlanNode right;
    private final Schema schema;
    private final String leftKeyFieldName;
    private final String rightKeyFieldName;
    private final String leftAlias;
    private final String rightAlias;
    private final Field keyField;

    public JoinNode(@JsonProperty(value="id") PlanNodeId id, @JsonProperty(value="type") Type type, @JsonProperty(value="left") PlanNode left, @JsonProperty(value="right") PlanNode right, @JsonProperty(value="leftKeyFieldName") String leftKeyFieldName, @JsonProperty(value="rightKeyFieldName") String rightKeyFieldName, @JsonProperty(value="leftAlias") String leftAlias, @JsonProperty(value="rightAlias") String rightAlias) {
        super(id);
        this.type = type;
        this.left = left;
        this.right = right;
        this.leftKeyFieldName = leftKeyFieldName;
        this.rightKeyFieldName = rightKeyFieldName;
        this.leftAlias = leftAlias;
        this.rightAlias = rightAlias;
        this.schema = this.buildSchema(left, right);
        this.keyField = this.schema.field(leftAlias + "." + leftKeyFieldName);
    }

    private Schema buildSchema(PlanNode left, PlanNode right) {
        String fieldName;
        Schema leftSchema = left.getSchema();
        Schema rightSchema = right.getSchema();
        SchemaBuilder schemaBuilder = SchemaBuilder.struct();
        for (Field field : leftSchema.fields()) {
            fieldName = this.leftAlias + "." + field.name();
            schemaBuilder.field(fieldName, field.schema());
        }
        for (Field field : rightSchema.fields()) {
            fieldName = this.rightAlias + "." + field.name();
            schemaBuilder.field(fieldName, field.schema());
        }
        return schemaBuilder.build();
    }

    @Override
    public Schema getSchema() {
        return this.schema;
    }

    @Override
    public Field getKeyField() {
        return this.keyField;
    }

    @Override
    public List<PlanNode> getSources() {
        return Arrays.asList(this.left, this.right);
    }

    public PlanNode getLeft() {
        return this.left;
    }

    public PlanNode getRight() {
        return this.right;
    }

    public String getLeftKeyFieldName() {
        return this.leftKeyFieldName;
    }

    public String getRightKeyFieldName() {
        return this.rightKeyFieldName;
    }

    public String getLeftAlias() {
        return this.leftAlias;
    }

    public String getRightAlias() {
        return this.rightAlias;
    }

    public Type getType() {
        return this.type;
    }

    public boolean isLeftJoin() {
        return this.type == Type.LEFT;
    }

    @Override
    public SchemaKStream buildStream(StreamsBuilder builder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> props, SchemaRegistryClient schemaRegistryClient) {
        if (!this.isLeftJoin()) {
            throw new KsqlException("Join type is not supported yet: " + (Object)((Object)this.getType()));
        }
        SchemaKTable table = this.tableForJoin(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, props, schemaRegistryClient);
        SchemaKStream stream = this.streamForJoin(this.getLeft().buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, props, schemaRegistryClient), this.getLeftKeyFieldName());
        KsqlTopicSerDe joinSerDe = this.getResultTopicSerde(this);
        return stream.leftJoin(table, this.getSchema(), this.getSchema().field(this.getLeftAlias() + "." + stream.getKeyField().name()), joinSerDe, ksqlConfig);
    }

    SchemaKTable tableForJoin(StreamsBuilder builder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> props, SchemaRegistryClient schemaRegistryClient) {
        HashMap<String, Object> joinTableProps = new HashMap<String, Object>();
        joinTableProps.putAll(props);
        joinTableProps.put("auto.offset.reset", "earliest");
        SchemaKStream schemaKStream = this.right.buildStream(builder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, joinTableProps, schemaRegistryClient);
        if (!(schemaKStream instanceof SchemaKTable)) {
            throw new KsqlException("Unsupported Join. Only stream-table joins are supported, but was " + this.getLeft() + "-" + this.getRight());
        }
        return (SchemaKTable)schemaKStream;
    }

    private KsqlTopicSerDe getResultTopicSerde(PlanNode node) {
        if (node instanceof StructuredDataSourceNode) {
            StructuredDataSourceNode structuredDataSourceNode = (StructuredDataSourceNode)node;
            return structuredDataSourceNode.getStructuredDataSource().getKsqlTopic().getKsqlTopicSerDe();
        }
        if (node instanceof JoinNode) {
            JoinNode joinNode = (JoinNode)node;
            return this.getResultTopicSerde(joinNode.getLeft());
        }
        return this.getResultTopicSerde(node.getSources().get(0));
    }

    private SchemaKStream streamForJoin(SchemaKStream stream, String leftKeyFieldName) {
        if (stream.getKeyField() == null || !stream.getKeyField().name().equals(leftKeyFieldName)) {
            Field field = (Field)SchemaUtil.getFieldByName((Schema)stream.getSchema(), (String)leftKeyFieldName).orElseThrow(() -> new KsqlException("couldn't find key field: " + leftKeyFieldName + " in schema:" + this.schema));
            return stream.selectKey(field, true);
        }
        return stream;
    }

    public static enum Type {
        CROSS,
        INNER,
        LEFT,
        RIGHT,
        FULL,
        IMPLICIT;

    }
}

