/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
import org.apache.flink.connector.kafka.lineage.LineageUtil;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
import org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter;
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
import org.apache.flink.connector.kafka.sink.KafkaCommittableSerializer;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.connector.kafka.sink.KafkaWriter;
import org.apache.flink.connector.kafka.sink.KafkaWriterState;
import org.apache.flink.connector.kafka.sink.KafkaWriterStateSerializer;
import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
import org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.internal.KafkaCommitter;
import org.apache.flink.connector.kafka.sink.internal.NoopCommitter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class KafkaSink<IN>
implements LineageVertexProvider,
TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable>,
SupportsPostCommitTopology<KafkaCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
    private final DeliveryGuarantee deliveryGuarantee;
    private final KafkaRecordSerializationSchema<IN> recordSerializer;
    private final Properties kafkaProducerConfig;
    private final String transactionalIdPrefix;
    private final TransactionNamingStrategy transactionNamingStrategy;

    KafkaSink(DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, KafkaRecordSerializationSchema<IN> recordSerializer, TransactionNamingStrategy transactionNamingStrategy) {
        this.deliveryGuarantee = deliveryGuarantee;
        this.kafkaProducerConfig = kafkaProducerConfig;
        this.transactionalIdPrefix = transactionalIdPrefix;
        this.recordSerializer = recordSerializer;
        this.transactionNamingStrategy = transactionNamingStrategy;
    }

    public static <IN> KafkaSinkBuilder<IN> builder() {
        return new KafkaSinkBuilder();
    }

    @Internal
    public Committer<KafkaCommittable> createCommitter(CommitterInitContext context) {
        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE) {
            return new KafkaCommitter(this.kafkaProducerConfig, this.transactionalIdPrefix, context.getTaskInfo().getIndexOfThisSubtask(), context.getTaskInfo().getAttemptNumber(), this.transactionNamingStrategy == TransactionNamingStrategy.POOLING, FlinkKafkaInternalProducer::new);
        }
        return new NoopCommitter();
    }

    @Internal
    public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {
        return new KafkaCommittableSerializer();
    }

    @Internal
    public KafkaWriter<IN> createWriter(WriterInitContext context) throws IOException {
        return this.restoreWriter(context, (Collection)Collections.emptyList());
    }

    @Internal
    public KafkaWriter<IN> restoreWriter(WriterInitContext context, Collection<KafkaWriterState> recoveredState) {
        KafkaWriter writer = this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE ? new ExactlyOnceKafkaWriter<IN>(this.deliveryGuarantee, this.kafkaProducerConfig, this.transactionalIdPrefix, context, this.recordSerializer, context.asSerializationSchemaInitializationContext(), this.transactionNamingStrategy.getAbortImpl(), this.transactionNamingStrategy.getImpl(), recoveredState) : new KafkaWriter(this.deliveryGuarantee, this.kafkaProducerConfig, context, this.recordSerializer, context.asSerializationSchemaInitializationContext());
        writer.initialize();
        return writer;
    }

    @Internal
    public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {
        return new KafkaWriterStateSerializer();
    }

    public void addPostCommitTopology(DataStream<CommittableMessage<KafkaCommittable>> committer) {
        if (this.deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE && this.transactionalIdPrefix != null) {
            Transformation transformation = committer.getTransformation();
            while (transformation.getOutputType() instanceof CommittableMessageTypeInfo && transformation.getCoLocationGroupKey() == null) {
                transformation.setCoLocationGroupKey(this.transactionalIdPrefix);
                transformation = (Transformation)transformation.getInputs().get(0);
            }
        }
    }

    @VisibleForTesting
    protected Properties getKafkaProducerConfig() {
        return this.kafkaProducerConfig;
    }

    public LineageVertex getLineageVertex() {
        Optional<KafkaDatasetFacet> kafkaDatasetFacet;
        if (this.recordSerializer instanceof KafkaDatasetFacetProvider) {
            kafkaDatasetFacet = ((KafkaDatasetFacetProvider)((Object)this.recordSerializer)).getKafkaDatasetFacet();
            if (!kafkaDatasetFacet.isPresent()) {
                LOG.info("Provider did not return kafka dataset facet");
                return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
            }
        } else {
            LOG.info("recordSerializer does not implement KafkaDatasetFacetProvider: {}", this.recordSerializer);
            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
        }
        kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
        String namespace = LineageUtil.namespaceOf(this.kafkaProducerConfig);
        Optional<Object> typeDatasetFacet = Optional.empty();
        if (this.recordSerializer instanceof TypeDatasetFacetProvider) {
            typeDatasetFacet = ((TypeDatasetFacetProvider)((Object)this.recordSerializer)).getTypeDatasetFacet();
        }
        if (typeDatasetFacet.isPresent()) {
            return LineageUtil.sourceLineageVertexOf(Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get(), (TypeDatasetFacet)typeDatasetFacet.get())));
        }
        return LineageUtil.sourceLineageVertexOf(Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get())));
    }
}

