/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifierProvider;
import org.apache.flink.connector.kafka.lineage.LineageUtil;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.SerializableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class KafkaSource<OUT>
implements LineageVertexProvider,
Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
ResultTypeQueryable<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
    private static final long serialVersionUID = -8755372893283732098L;
    private final KafkaSubscriber subscriber;
    private final OffsetsInitializer startingOffsetsInitializer;
    private final OffsetsInitializer stoppingOffsetsInitializer;
    private final Boundedness boundedness;
    private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
    private final Properties props;
    private final SerializableSupplier<String> rackIdSupplier;

    KafkaSource(KafkaSubscriber subscriber, OffsetsInitializer startingOffsetsInitializer, @Nullable OffsetsInitializer stoppingOffsetsInitializer, Boundedness boundedness, KafkaRecordDeserializationSchema<OUT> deserializationSchema, Properties props, SerializableSupplier<String> rackIdSupplier) {
        this.subscriber = subscriber;
        this.startingOffsetsInitializer = startingOffsetsInitializer;
        this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
        this.boundedness = boundedness;
        this.deserializationSchema = deserializationSchema;
        this.props = props;
        this.rackIdSupplier = rackIdSupplier;
    }

    public static <OUT> KafkaSourceBuilder<OUT> builder() {
        return new KafkaSourceBuilder();
    }

    public Boundedness getBoundedness() {
        return this.boundedness;
    }

    @Internal
    public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext) throws Exception {
        return this.createReader(readerContext, ignore -> {});
    }

    @VisibleForTesting
    SourceReader<OUT, KafkaPartitionSplit> createReader(final SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook) throws Exception {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext(){

            public MetricGroup getMetricGroup() {
                return readerContext.metricGroup().addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return readerContext.getUserCodeClassLoader();
            }
        });
        KafkaSourceReaderMetrics kafkaSourceReaderMetrics = new KafkaSourceReaderMetrics(readerContext.metricGroup());
        Supplier<KafkaPartitionSplitReader> splitReaderSupplier = () -> new KafkaPartitionSplitReader(this.props, readerContext, kafkaSourceReaderMetrics, Optional.ofNullable(this.rackIdSupplier).map(Supplier::get).orElse(null));
        KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<OUT>(this.deserializationSchema);
        return new KafkaSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>)elementsQueue, new KafkaSourceFetcherManager((FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>)elementsQueue, splitReaderSupplier::get, splitFinishedHook), recordEmitter, this.toConfiguration(this.props), readerContext, kafkaSourceReaderMetrics);
    }

    @Internal
    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> createEnumerator(SplitEnumeratorContext<KafkaPartitionSplit> enumContext) {
        return new KafkaSourceEnumerator(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, enumContext, this.boundedness);
    }

    @Internal
    public SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> restoreEnumerator(SplitEnumeratorContext<KafkaPartitionSplit> enumContext, KafkaSourceEnumState checkpoint) throws IOException {
        return new KafkaSourceEnumerator(this.subscriber, this.startingOffsetsInitializer, this.stoppingOffsetsInitializer, this.props, enumContext, this.boundedness, checkpoint);
    }

    @Internal
    public SimpleVersionedSerializer<KafkaPartitionSplit> getSplitSerializer() {
        return new KafkaPartitionSplitSerializer();
    }

    @Internal
    public SimpleVersionedSerializer<KafkaSourceEnumState> getEnumeratorCheckpointSerializer() {
        return new KafkaSourceEnumStateSerializer();
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    private Configuration toConfiguration(Properties props) {
        Configuration config = new Configuration();
        props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty((String)key)));
        return config;
    }

    @VisibleForTesting
    Configuration getConfiguration() {
        return this.toConfiguration(this.props);
    }

    @VisibleForTesting
    KafkaSubscriber getKafkaSubscriber() {
        return this.subscriber;
    }

    @VisibleForTesting
    OffsetsInitializer getStoppingOffsetsInitializer() {
        return this.stoppingOffsetsInitializer;
    }

    public SourceLineageVertex getLineageVertex() {
        if (!(this.subscriber instanceof KafkaDatasetIdentifierProvider)) {
            LOG.info("unable to determine topic identifier");
            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
        }
        Optional<DefaultKafkaDatasetIdentifier> topicsIdentifier = ((KafkaDatasetIdentifierProvider)((Object)this.subscriber)).getDatasetIdentifier();
        if (!topicsIdentifier.isPresent()) {
            LOG.info("No topics' identifier returned from subscriber");
            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
        }
        DefaultKafkaDatasetFacet kafkaDatasetFacet = new DefaultKafkaDatasetFacet(topicsIdentifier.get(), this.props);
        String namespace = LineageUtil.namespaceOf(this.props);
        return LineageUtil.sourceLineageVertexOf(Collections.singletonList(LineageUtil.datasetOf(namespace, (KafkaDatasetFacet)kafkaDatasetFacet, new DefaultTypeDatasetFacet(this.getProducedType()))));
    }
}

