package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.ShutdownableThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTaskThread.class */
public class WorkerSinkTaskThread extends ShutdownableThread {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerSinkTask.class);
    private final WorkerSinkTask task;
    private long nextCommit;
    private boolean committing;
    private int commitSeqno;
    private long commitStarted;
    private int commitFailures;

    public WorkerSinkTaskThread(WorkerSinkTask workerSinkTask, String str, Time time, WorkerConfig workerConfig) {
        super(str);
        this.task = workerSinkTask;
        this.nextCommit = time.milliseconds() + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG).longValue();
        this.committing = false;
        this.commitSeqno = 0;
        this.commitStarted = -1L;
        this.commitFailures = 0;
    }

    @Override // org.apache.kafka.connect.util.ShutdownableThread
    public void execute() {
        if (this.task.joinConsumerGroupAndStart()) {
            while (getRunning()) {
                iteration();
            }
            this.task.commitOffsets(true, -1);
        }
    }

    public void iteration() {
        long milliseconds = this.task.time().milliseconds();
        if (!this.committing && milliseconds >= this.nextCommit) {
            synchronized (this) {
                this.committing = true;
                this.commitSeqno++;
                this.commitStarted = milliseconds;
            }
            this.task.commitOffsets(false, this.commitSeqno);
            this.nextCommit += this.task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG).longValue();
        }
        long longValue = this.commitStarted + this.task.workerConfig().getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG).longValue();
        if (this.committing && milliseconds >= longValue) {
            log.warn("Commit of {} offsets timed out", this);
            this.commitFailures++;
            this.committing = false;
        }
        this.task.poll(Math.max(this.nextCommit - milliseconds, 0L));
    }

    public void onCommitCompleted(Throwable th, long j) {
        synchronized (this) {
            if (this.commitSeqno != j) {
                log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}", this, Long.valueOf(j), Integer.valueOf(this.commitSeqno));
            } else {
                if (th != null) {
                    log.error("Commit of {} offsets threw an unexpected exception: ", this, th);
                    this.commitFailures++;
                } else {
                    log.debug("Finished {} offset commit successfully in {} ms", this, Long.valueOf(this.task.time().milliseconds() - this.commitStarted));
                    this.commitFailures = 0;
                }
                this.committing = false;
            }
        }
    }

    public int commitFailures() {
        return this.commitFailures;
    }
}
