/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.planner.plan;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.metastore.KsqlTable;
import io.confluent.ksql.metastore.MetastoreUtil;
import io.confluent.ksql.metastore.StructuredDataSource;
import io.confluent.ksql.physical.AddTimestampColumn;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.PlanNodeId;
import io.confluent.ksql.planner.plan.PlanVisitor;
import io.confluent.ksql.serde.DataSource;
import io.confluent.ksql.serde.KsqlTopicSerDe;
import io.confluent.ksql.serde.WindowedSerde;
import io.confluent.ksql.structured.SchemaKStream;
import io.confluent.ksql.structured.SchemaKTable;
import io.confluent.ksql.util.KafkaTopicClient;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.SchemaUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.concurrent.Immutable;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.Windowed;

@Immutable
public class StructuredDataSourceNode
extends PlanNode {
    private static final ValueMapperWithKey<String, GenericRow, GenericRow> nonWindowedValueMapper = (key, row) -> {
        if (row != null) {
            row.getColumns().add(0, key);
        }
        return row;
    };
    private static final ValueMapperWithKey<Windowed<String>, GenericRow, GenericRow> windowedMapper = (key, row) -> {
        if (row != null) {
            row.getColumns().add(0, String.format("%s : Window{start=%d end=-}", key.key(), key.window().start()));
        }
        return row;
    };
    private final WindowedSerde windowedSerde = new WindowedSerde();
    private final StructuredDataSource structuredDataSource;
    private final Schema schema;

    @JsonCreator
    public StructuredDataSourceNode(@JsonProperty(value="id") PlanNodeId id, @JsonProperty(value="structuredDataSource") StructuredDataSource structuredDataSource, @JsonProperty(value="schema") Schema schema) {
        super(id);
        Objects.requireNonNull(structuredDataSource, "structuredDataSource can't be null");
        Objects.requireNonNull(schema, "schema can't be null");
        this.schema = schema;
        this.structuredDataSource = structuredDataSource;
    }

    public String getTopicName() {
        return this.structuredDataSource.getTopicName();
    }

    @Override
    public Schema getSchema() {
        return this.schema;
    }

    @Override
    public Field getKeyField() {
        return this.structuredDataSource.getKeyField();
    }

    public StructuredDataSource getStructuredDataSource() {
        return this.structuredDataSource;
    }

    @Override
    public List<PlanNode> getSources() {
        return null;
    }

    @Override
    public <C, R> R accept(PlanVisitor<C, R> visitor, C context) {
        return visitor.visitStructuredDataSourceNode(this, context);
    }

    @Override
    public SchemaKStream buildStream(StreamsBuilder builder, KsqlConfig ksqlConfig, KafkaTopicClient kafkaTopicClient, MetastoreUtil metastoreUtil, FunctionRegistry functionRegistry, Map<String, Object> props, SchemaRegistryClient schemaRegistryClient) {
        if (this.getTimestampField() != null) {
            int timestampColumnIndex = this.getTimeStampColumnIndex();
            ksqlConfig.put("ksql.timestamp.column.index", (Object)timestampColumnIndex);
        }
        KsqlTopicSerDe ksqlTopicSerDe = this.getStructuredDataSource().getKsqlTopic().getKsqlTopicSerDe();
        Serde genericRowSerde = ksqlTopicSerDe.getGenericRowSerde(SchemaUtil.removeImplicitRowTimeRowKeyFromSchema((Schema)this.getSchema()), ksqlConfig, false, schemaRegistryClient);
        if (this.getDataSourceType() == DataSource.DataSourceType.KTABLE) {
            KsqlTable table = (KsqlTable)this.getStructuredDataSource();
            KTable kTable = this.createKTable(builder, this.getAutoOffsetReset(props), table, (Serde<GenericRow>)genericRowSerde, (Serde<GenericRow>)table.getKsqlTopic().getKsqlTopicSerDe().getGenericRowSerde(this.getSchema(), ksqlConfig, true, schemaRegistryClient));
            return new SchemaKTable(this.getSchema(), kTable, this.getKeyField(), new ArrayList<SchemaKStream>(), table.isWindowed(), SchemaKStream.Type.SOURCE, functionRegistry, schemaRegistryClient);
        }
        return new SchemaKStream(this.getSchema(), (KStream<String, GenericRow>)builder.stream(this.getStructuredDataSource().getKsqlTopic().getKafkaTopicName(), Consumed.with((Serde)Serdes.String(), (Serde)genericRowSerde)).mapValues(nonWindowedValueMapper).transformValues((ValueTransformerSupplier)new AddTimestampColumn(), new String[0]), this.getKeyField(), new ArrayList<SchemaKStream>(), SchemaKStream.Type.SOURCE, functionRegistry, schemaRegistryClient);
    }

    private Topology.AutoOffsetReset getAutoOffsetReset(Map<String, Object> props) {
        if (props.containsKey("auto.offset.reset")) {
            String offestReset = props.get("auto.offset.reset").toString();
            if (offestReset.equalsIgnoreCase("EARLIEST")) {
                return Topology.AutoOffsetReset.EARLIEST;
            }
            if (offestReset.equalsIgnoreCase("LATEST")) {
                return Topology.AutoOffsetReset.LATEST;
            }
        }
        return null;
    }

    private int getTimeStampColumnIndex() {
        String timestampFieldName = this.getTimestampField().name();
        if (timestampFieldName.contains(".")) {
            for (int i = 2; i < this.schema.fields().size(); ++i) {
                Field field = (Field)this.schema.fields().get(i);
                if (!(field.name().contains(".") ? timestampFieldName.equals(field.name()) : timestampFieldName.substring(timestampFieldName.indexOf(".") + 1).equals(field.name()))) continue;
                return i - 2;
            }
        } else {
            for (int i = 2; i < this.schema.fields().size(); ++i) {
                Field field = (Field)this.schema.fields().get(i);
                if (!(field.name().contains(".") ? timestampFieldName.equals(field.name().substring(field.name().indexOf(".") + 1)) : timestampFieldName.equals(field.name()))) continue;
                return i - 2;
            }
        }
        return -1;
    }

    private KTable createKTable(StreamsBuilder builder, Topology.AutoOffsetReset autoOffsetReset, KsqlTable ksqlTable, Serde<GenericRow> genericRowSerde, Serde<GenericRow> genericRowSerdeAfterRead) {
        if (ksqlTable.isWindowed()) {
            return this.table((KStream)builder.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(), Consumed.with((Serde)this.windowedSerde, genericRowSerde).withOffsetResetPolicy(autoOffsetReset)).mapValues(windowedMapper).transformValues((ValueTransformerSupplier)new AddTimestampColumn(), new String[0]), (Serde)this.windowedSerde, genericRowSerdeAfterRead);
        }
        return this.table(builder.stream(ksqlTable.getKsqlTopic().getKafkaTopicName(), Consumed.with((Serde)Serdes.String(), genericRowSerde).withOffsetResetPolicy(autoOffsetReset)).mapValues(nonWindowedValueMapper).transformValues((ValueTransformerSupplier)new AddTimestampColumn(), new String[0]), Serdes.String(), genericRowSerdeAfterRead);
    }

    private <K> KTable table(KStream<K, GenericRow> stream, Serde<K> keySerde, Serde<GenericRow> valueSerde) {
        return stream.groupByKey(Serialized.with(keySerde, valueSerde)).reduce((genericRow, newValue) -> newValue);
    }

    public DataSource.DataSourceType getDataSourceType() {
        return this.structuredDataSource.getDataSourceType();
    }

    public Field getTimestampField() {
        return this.structuredDataSource.getTimestampField();
    }
}

