package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.SchemaUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsBuilder;

/* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode.class */
public class JoinNode extends PlanNode {
    private final Type type;
    private final PlanNode left;
    private final PlanNode right;
    private final Schema schema;
    private final String leftKeyFieldName;
    private final String rightKeyFieldName;
    private final String leftAlias;
    private final String rightAlias;
    private final Field keyField;

    /* loaded from: input_file:io/confluent/ksql/planner/plan/JoinNode$Type.class */
    public enum Type {
        CROSS,
        INNER,
        LEFT,
        RIGHT,
        FULL,
        IMPLICIT
    }

    public JoinNode(@JsonProperty("id") PlanNodeId planNodeId, @JsonProperty("type") Type type, @JsonProperty("left") PlanNode planNode, @JsonProperty("right") PlanNode planNode2, @JsonProperty("leftKeyFieldName") String str, @JsonProperty("rightKeyFieldName") String str2, @JsonProperty("leftAlias") String str3, @JsonProperty("rightAlias") String str4) {
        super(planNodeId);
        this.type = type;
        this.left = planNode;
        this.right = planNode2;
        this.leftKeyFieldName = str;
        this.rightKeyFieldName = str2;
        this.leftAlias = str3;
        this.rightAlias = str4;
        this.schema = buildSchema(planNode, planNode2);
        this.keyField = this.schema.field(str3 + "." + str);
    }

    private Schema buildSchema(PlanNode planNode, PlanNode planNode2) {
        Schema schema = planNode.getSchema();
        Schema schema2 = planNode2.getSchema();
        SchemaBuilder struct = SchemaBuilder.struct();
        for (Field field : schema.fields()) {
            struct.field(this.leftAlias + "." + field.name(), field.schema());
        }
        for (Field field2 : schema2.fields()) {
            struct.field(this.rightAlias + "." + field2.name(), field2.schema());
        }
        return struct.build();
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public Schema getSchema() {
        return this.schema;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public Field getKeyField() {
        return this.keyField;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public List<PlanNode> getSources() {
        return Arrays.asList(this.left, this.right);
    }

    public PlanNode getLeft() {
        return this.left;
    }

    public PlanNode getRight() {
        return this.right;
    }

    public String getLeftKeyFieldName() {
        return this.leftKeyFieldName;
    }

    public String getRightKeyFieldName() {
        return this.rightKeyFieldName;
    }

    public String getLeftAlias() {
        return this.leftAlias;
    }

    public String getRightAlias() {
        return this.rightAlias;
    }

    public Type getType() {
        return this.type;
    }

    public boolean isLeftJoin() {
        return this.type == Type.LEFT;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public SchemaKStream buildStream(StreamsBuilder streamsBuilder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> map, SchemaRegistryClient schemaRegistryClient) {
        if (!isLeftJoin()) {
            throw new KsqlException("Join type is not supported yet: " + getType());
        }
        SchemaKTable tableForJoin = tableForJoin(streamsBuilder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, map, schemaRegistryClient);
        SchemaKStream streamForJoin = streamForJoin(getLeft().buildStream(streamsBuilder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, map, schemaRegistryClient), getLeftKeyFieldName());
        return streamForJoin.leftJoin(tableForJoin, getSchema(), getSchema().field(getLeftAlias() + "." + streamForJoin.getKeyField().name()), getResultTopicSerde(this), ksqlConfig);
    }

    SchemaKTable tableForJoin(StreamsBuilder streamsBuilder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> map, SchemaRegistryClient schemaRegistryClient) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(map);
        hashMap.put("auto.offset.reset", "earliest");
        SchemaKStream buildStream = this.right.buildStream(streamsBuilder, ksqlConfig, kafkaTopicClient, metastoreUtil, functionRegistry, hashMap, schemaRegistryClient);
        if (buildStream instanceof SchemaKTable) {
            return (SchemaKTable) buildStream;
        }
        throw new KsqlException("Unsupported Join. Only stream-table joins are supported, but was " + getLeft() + "-" + getRight());
    }

    private KsqlTopicSerDe getResultTopicSerde(PlanNode planNode) {
        return planNode instanceof StructuredDataSourceNode ? ((StructuredDataSourceNode) planNode).getStructuredDataSource().getKsqlTopic().getKsqlTopicSerDe() : planNode instanceof JoinNode ? getResultTopicSerde(((JoinNode) planNode).getLeft()) : getResultTopicSerde(planNode.getSources().get(0));
    }

    private SchemaKStream streamForJoin(SchemaKStream schemaKStream, String str) {
        return (schemaKStream.getKeyField() == null || !schemaKStream.getKeyField().name().equals(str)) ? schemaKStream.selectKey((Field) SchemaUtil.getFieldByName(schemaKStream.getSchema(), str).orElseThrow(() -> {
            return new KsqlException("couldn't find key field: " + str + " in schema:" + this.schema);
        }), true) : schemaKStream;
    }
}
