/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.reader.fetcher;

import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherTask;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KafkaSourceFetcherManager
extends SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceFetcherManager.class);

    public KafkaSourceFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue, Supplier<SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit>> splitReaderSupplier, Consumer<Collection<String>> splitFinishedHook) {
        super(splitReaderSupplier, new Configuration(), splitFinishedHook);
    }

    public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback callback) {
        LOG.debug("Committing offsets {}", offsetsToCommit);
        if (offsetsToCommit.isEmpty()) {
            return;
        }
        SplitFetcher splitFetcher = (SplitFetcher)this.fetchers.get(0);
        if (splitFetcher != null) {
            this.enqueueOffsetsCommitTask((SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit>)splitFetcher, offsetsToCommit, callback);
        } else {
            splitFetcher = this.createSplitFetcher();
            this.enqueueOffsetsCommitTask((SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit>)splitFetcher, offsetsToCommit, callback);
            this.startFetcher(splitFetcher);
        }
    }

    private void enqueueOffsetsCommitTask(SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher, final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, final OffsetCommitCallback callback) {
        final KafkaPartitionSplitReader kafkaReader = (KafkaPartitionSplitReader)splitFetcher.getSplitReader();
        splitFetcher.enqueueTask(new SplitFetcherTask(){

            public boolean run() throws IOException {
                kafkaReader.notifyCheckpointComplete(offsetsToCommit, callback);
                return true;
            }

            public void wakeUp() {
            }
        });
    }
}

