package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.metastore.StructuredDataSource;
import io.confluent.ksql.physical.AddTimestampColumn;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.serde.WindowedSerde;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.concurrent.Immutable;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.Windowed;

@Immutable
/* loaded from: input_file:io/confluent/ksql/planner/plan/StructuredDataSourceNode.class */
public class StructuredDataSourceNode extends PlanNode {
    private static final ValueMapperWithKey<String, GenericRow, GenericRow> nonWindowedValueMapper = (str, genericRow) -> {
        if (genericRow != null) {
            genericRow.getColumns().add(0, str);
        }
        return genericRow;
    };
    private static final ValueMapperWithKey<Windowed<String>, GenericRow, GenericRow> windowedMapper = (windowed, genericRow) -> {
        if (genericRow != null) {
            genericRow.getColumns().add(0, String.format("%s : Window{start=%d end=-}", windowed.key(), Long.valueOf(windowed.window().start())));
        }
        return genericRow;
    };
    private final WindowedSerde windowedSerde;
    private final StructuredDataSource structuredDataSource;
    private final Schema schema;

    @JsonCreator
    public StructuredDataSourceNode(@JsonProperty("id") PlanNodeId planNodeId, @JsonProperty("structuredDataSource") StructuredDataSource structuredDataSource, @JsonProperty("schema") Schema schema) {
        super(planNodeId);
        this.windowedSerde = new WindowedSerde();
        Objects.requireNonNull(structuredDataSource, "structuredDataSource can't be null");
        Objects.requireNonNull(schema, "schema can't be null");
        this.schema = schema;
        this.structuredDataSource = structuredDataSource;
    }

    public String getTopicName() {
        return this.structuredDataSource.getTopicName();
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public Schema getSchema() {
        return this.schema;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public Field getKeyField() {
        return this.structuredDataSource.getKeyField();
    }

    public StructuredDataSource getStructuredDataSource() {
        return this.structuredDataSource;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public List<PlanNode> getSources() {
        return null;
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public <C, R> R accept(PlanVisitor<C, R> planVisitor, C c) {
        return planVisitor.visitStructuredDataSourceNode(this, c);
    }

    @Override // io.confluent.ksql.planner.plan.PlanNode
    public SchemaKStream buildStream(StreamsBuilder streamsBuilder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> map, SchemaRegistryClient schemaRegistryClient) {
        if (getTimestampField() != null) {
            ksqlConfig.put("ksql.timestamp.column.index", Integer.valueOf(getTimeStampColumnIndex()));
        }
        Serde<GenericRow> genericRowSerde = getStructuredDataSource().getKsqlTopic().getKsqlTopicSerDe().getGenericRowSerde(SchemaUtil.removeImplicitRowTimeRowKeyFromSchema(getSchema()), ksqlConfig, false, schemaRegistryClient);
        if (getDataSourceType() != DataSource.DataSourceType.KTABLE) {
            return new SchemaKStream(getSchema(), streamsBuilder.stream(getStructuredDataSource().getKsqlTopic().getKafkaTopicName(), Consumed.with(Serdes.String(), genericRowSerde)).mapValues(nonWindowedValueMapper).transformValues(new AddTimestampColumn(), new String[0]), getKeyField(), new ArrayList(), SchemaKStream.Type.SOURCE, functionRegistry, schemaRegistryClient);
        }
        KsqlTable ksqlTable = (KsqlTable) getStructuredDataSource();
        return new SchemaKTable(getSchema(), createKTable(streamsBuilder, getAutoOffsetReset(map), ksqlTable, genericRowSerde, ksqlTable.getKsqlTopic().getKsqlTopicSerDe().getGenericRowSerde(getSchema(), ksqlConfig, true, schemaRegistryClient)), getKeyField(), new ArrayList(), ksqlTable.isWindowed(), SchemaKStream.Type.SOURCE, functionRegistry, schemaRegistryClient);
    }

    private Topology.AutoOffsetReset getAutoOffsetReset(Map<String, Object> map) {
        if (!map.containsKey("auto.offset.reset")) {
            return null;
        }
        String obj = map.get("auto.offset.reset").toString();
        if (obj.equalsIgnoreCase("EARLIEST")) {
            return Topology.AutoOffsetReset.EARLIEST;
        }
        if (obj.equalsIgnoreCase("LATEST")) {
            return Topology.AutoOffsetReset.LATEST;
        }
        return null;
    }

    private int getTimeStampColumnIndex() {
        String name = getTimestampField().name();
        if (name.contains(".")) {
            for (int i = 2; i < this.schema.fields().size(); i++) {
                Field field = (Field) this.schema.fields().get(i);
                if (field.name().contains(".")) {
                    if (name.equals(field.name())) {
                        return i - 2;
                    }
                } else if (name.substring(name.indexOf(".") + 1).equals(field.name())) {
                    return i - 2;
                }
            }
            return -1;
        }
        for (int i2 = 2; i2 < this.schema.fields().size(); i2++) {
            Field field2 = (Field) this.schema.fields().get(i2);
            if (field2.name().contains(".")) {
                if (name.equals(field2.name().substring(field2.name().indexOf(".") + 1))) {
                    return i2 - 2;
                }
            } else if (name.equals(field2.name())) {
                return i2 - 2;
            }
        }
        return -1;
    }

    private KTable createKTable(StreamsBuilder streamsBuilder, Topology.AutoOffsetReset autoOffsetReset, KsqlTable ksqlTable, Serde<GenericRow> serde, Serde<GenericRow> serde2) {
        return ksqlTable.isWindowed() ? table(streamsBuilder.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(), Consumed.with(this.windowedSerde, serde).withOffsetResetPolicy(autoOffsetReset)).mapValues(windowedMapper).transformValues(new AddTimestampColumn(), new String[0]), this.windowedSerde, serde2) : table(streamsBuilder.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(), Consumed.with(Serdes.String(), serde).withOffsetResetPolicy(autoOffsetReset)).mapValues(nonWindowedValueMapper).transformValues(new AddTimestampColumn(), new String[0]), Serdes.String(), serde2);
    }

    private <K> KTable table(KStream<K, GenericRow> kStream, Serde<K> serde, Serde<GenericRow> serde2) {
        return kStream.groupByKey(Serialized.with(serde, serde2)).reduce((genericRow, genericRow2) -> {
            return genericRow2;
        });
    }

    public DataSource.DataSourceType getDataSourceType() {
        return this.structuredDataSource.getDataSourceType();
    }

    public Field getTimestampField() {
        return this.structuredDataSource.getTimestampField();
    }
}
