/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink.internal;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
import org.apache.flink.connector.kafka.sink.internal.WritableBackchannel;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.RetriableException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KafkaCommitter
implements Committer<KafkaCommittable>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
    public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE = "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.";
    private final Properties kafkaProducerConfig;
    private final boolean reusesTransactionalIds;
    private final BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory;
    private final WritableBackchannel<TransactionFinished> backchannel;
    @Nullable
    private FlinkKafkaInternalProducer<?, ?> committingProducer;

    public KafkaCommitter(Properties kafkaProducerConfig, String transactionalIdPrefix, int subtaskId, int attemptNumber, boolean reusesTransactionalIds, BiFunction<Properties, String, FlinkKafkaInternalProducer<?, ?>> producerFactory) {
        this.kafkaProducerConfig = kafkaProducerConfig;
        this.reusesTransactionalIds = reusesTransactionalIds;
        this.producerFactory = producerFactory;
        this.backchannel = BackchannelFactory.getInstance().getWritableBackchannel(subtaskId, attemptNumber, transactionalIdPrefix);
    }

    @VisibleForTesting
    public WritableBackchannel<TransactionFinished> getBackchannel() {
        return this.backchannel;
    }

    @Nullable
    @VisibleForTesting
    FlinkKafkaInternalProducer<?, ?> getCommittingProducer() {
        return this.committingProducer;
    }

    public void commit(Collection<Committer.CommitRequest<KafkaCommittable>> requests) throws IOException, InterruptedException {
        for (Committer.CommitRequest<KafkaCommittable> request : requests) {
            KafkaCommittable committable = (KafkaCommittable)request.getCommittable();
            String transactionalId = committable.getTransactionalId();
            LOG.debug("Committing Kafka transaction {}", (Object)transactionalId);
            Optional<FlinkKafkaInternalProducer<?, ?>> writerProducer = committable.getProducer();
            FlinkKafkaInternalProducer producer = null;
            try {
                producer = writerProducer.orElseGet(() -> this.getProducer(committable));
                producer.commitTransaction();
                this.backchannel.send(TransactionFinished.successful(committable.getTransactionalId()));
            }
            catch (RetriableException e) {
                LOG.warn("Encountered retriable exception while committing {}.", (Object)transactionalId, (Object)e);
                request.retryLater();
            }
            catch (ProducerFencedException e) {
                this.logFencedRequest(request, e);
                this.handleFailedTransaction(producer);
                request.signalFailedWithKnownReason((Throwable)e);
            }
            catch (InvalidTxnStateException e) {
                LOG.error("Unable to commit transaction ({}) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", request, (Object)e);
                this.handleFailedTransaction(producer);
                request.signalFailedWithKnownReason((Throwable)e);
            }
            catch (UnknownProducerIdException e) {
                LOG.error("Unable to commit transaction ({}) because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.", request, (Object)e);
                this.handleFailedTransaction(producer);
                request.signalFailedWithKnownReason((Throwable)e);
            }
            catch (Exception e) {
                LOG.error("Transaction ({}) encountered error and data has been potentially lost.", request, (Object)e);
                this.closeCommitterProducer(producer);
                request.signalFailedWithUnknownReason((Throwable)e);
            }
        }
    }

    private void logFencedRequest(Committer.CommitRequest<KafkaCommittable> request, ProducerFencedException e) {
        if (this.reusesTransactionalIds) {
            LOG.warn("Unable to commit transaction ({}) because its producer is already fenced. If this warning appears as part of the recovery of a checkpoint, it is expected in some cases (e.g., aborted checkpoints in previous attempt). If it's outside of recovery, this means that you either have a different sink with the same '{}' or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.", new Object[]{request, "transactional.id", "transaction.timeout.ms", this.kafkaProducerConfig.getProperty("transaction.timeout.ms"), e});
        } else {
            LOG.error("Unable to commit transaction ({}) because its producer is already fenced. This means that you either have a different producer with the same '{}' (this is unlikely with the '{}' as all generated ids are unique and shouldn't be reused) or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.", new Object[]{request, "transactional.id", KafkaSink.class.getSimpleName(), "transaction.timeout.ms", this.kafkaProducerConfig.getProperty("transaction.timeout.ms"), e});
        }
    }

    private void handleFailedTransaction(FlinkKafkaInternalProducer<?, ?> producer) {
        if (producer == null) {
            return;
        }
        this.backchannel.send(TransactionFinished.erroneously(producer.getTransactionalId()));
        this.closeCommitterProducer(producer);
    }

    private void closeCommitterProducer(FlinkKafkaInternalProducer<?, ?> producer) {
        if (producer == this.committingProducer) {
            this.committingProducer.close();
            this.committingProducer = null;
        }
    }

    @Override
    public void close() throws IOException {
        try {
            IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this.backchannel, this.committingProducer});
        }
        catch (Exception e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
    }

    private FlinkKafkaInternalProducer<?, ?> getProducer(KafkaCommittable committable) {
        if (this.committingProducer == null) {
            this.committingProducer = this.producerFactory.apply(this.kafkaProducerConfig, committable.getTransactionalId());
        } else {
            this.committingProducer.setTransactionId(committable.getTransactionalId());
        }
        this.committingProducer.resumeTransaction(committable.getProducerId(), committable.getEpoch());
        return this.committingProducer;
    }
}

