package org.apache.kafka.connect.runtime;

import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.class */
public class SourceTaskOffsetCommitter {
    private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
    private Time time;
    private WorkerConfig config;
    private ScheduledExecutorService commitExecutorService;
    private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter$ScheduledCommitTask.class */
    public static class ScheduledCommitTask {
        ScheduledFuture<?> commitFuture;
        boolean cancelled = false;
        CountDownLatch finishedLatch = null;

        ScheduledCommitTask(ScheduledFuture<?> scheduledFuture) {
            this.commitFuture = scheduledFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SourceTaskOffsetCommitter(Time time, WorkerConfig workerConfig) {
        this.commitExecutorService = null;
        this.time = time;
        this.config = workerConfig;
        this.commitExecutorService = Executors.newSingleThreadScheduledExecutor();
    }

    public void close(long j) {
        this.commitExecutorService.shutdown();
        try {
            if (!this.commitExecutorService.awaitTermination(j, TimeUnit.MILLISECONDS)) {
                log.error("Graceful shutdown of offset commitOffsets thread timed out.");
            }
        } catch (InterruptedException e) {
        }
    }

    public void schedule(final ConnectorTaskId connectorTaskId, final WorkerSourceTask workerSourceTask) {
        synchronized (this.committers) {
            this.committers.put(connectorTaskId, new ScheduledCommitTask(this.commitExecutorService.schedule(new Runnable() { // from class: org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.1
                @Override // java.lang.Runnable
                public void run() {
                    SourceTaskOffsetCommitter.this.commit(connectorTaskId, workerSourceTask);
                }
            }, this.config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS)));
        }
    }

    public void remove(ConnectorTaskId connectorTaskId) {
        ScheduledCommitTask remove;
        synchronized (this.committers) {
            remove = this.committers.remove(connectorTaskId);
            remove.cancelled = true;
            remove.commitFuture.cancel(false);
        }
        if (remove.finishedLatch != null) {
            try {
                remove.finishedLatch.await();
            } catch (InterruptedException e) {
                throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter.", e);
            }
        }
    }

    public void commit(ConnectorTaskId connectorTaskId, WorkerSourceTask workerSourceTask) {
        synchronized (this.committers) {
            ScheduledCommitTask scheduledCommitTask = this.committers.get(connectorTaskId);
            if (scheduledCommitTask == null || scheduledCommitTask.cancelled) {
                return;
            }
            scheduledCommitTask.finishedLatch = new CountDownLatch(1);
            try {
                try {
                    log.debug("Committing offsets for {}", workerSourceTask);
                    if (!workerSourceTask.commitOffsets()) {
                        log.error("Failed to commit offsets for {}", workerSourceTask);
                    }
                    synchronized (this.committers) {
                        scheduledCommitTask.finishedLatch.countDown();
                        if (!scheduledCommitTask.cancelled) {
                            schedule(connectorTaskId, workerSourceTask);
                        }
                    }
                } catch (Throwable th) {
                    log.error("Unhandled exception when committing {}: ", workerSourceTask, th);
                    synchronized (this.committers) {
                        scheduledCommitTask.finishedLatch.countDown();
                        if (!scheduledCommitTask.cancelled) {
                            schedule(connectorTaskId, workerSourceTask);
                        }
                    }
                }
            } catch (Throwable th2) {
                synchronized (this.committers) {
                    scheduledCommitTask.finishedLatch.countDown();
                    if (!scheduledCommitTask.cancelled) {
                        schedule(connectorTaskId, workerSourceTask);
                    }
                    throw th2;
                }
            }
        }
    }
}
