/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.TransformationChain;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerSinkTaskContext;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ImpersonationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkerSinkTask
extends WorkerTask {
    private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
    private final WorkerConfig workerConfig;
    private final SinkTask task;
    private final ClusterConfigState configState;
    private Map<String, String> taskConfig;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    private final TransformationChain<SinkRecord> transformationChain;
    private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
    private final boolean isTopicTrackingEnabled;
    private final Consumer<byte[], byte[]> consumer;
    private WorkerSinkTaskContext context;
    private final List<SinkRecord> messageBatch;
    private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
    private final Map<TopicPartition, OffsetAndMetadata> currentOffsets;
    private final Map<TopicPartition, OffsetAndMetadata> origOffsets;
    private RuntimeException rebalanceException;
    private long nextCommit;
    private int commitSeqno;
    private long commitStarted;
    private int commitFailures;
    private boolean pausedForRedelivery;
    private boolean committing;
    private boolean taskStopped;
    private final WorkerErrantRecordReporter workerErrantRecordReporter;
    private final Supplier<List<ErrorReporter>> errorReportersSupplier;

    public WorkerSinkTask(ConnectorTaskId id, SinkTask task, TaskStatus.Listener statusListener, TargetState initialState, WorkerConfig workerConfig, ClusterConfigState configState, ConnectMetrics connectMetrics, Converter keyConverter, Converter valueConverter, ErrorHandlingMetrics errorMetrics, HeaderConverter headerConverter, TransformationChain<SinkRecord> transformationChain, Consumer<byte[], byte[]> consumer, ClassLoader loader, Time time, RetryWithToleranceOperator retryWithToleranceOperator, WorkerErrantRecordReporter workerErrantRecordReporter, StatusBackingStore statusBackingStore, Supplier<List<ErrorReporter>> errorReportersSupplier) {
        super(id, statusListener, initialState, loader, connectMetrics, errorMetrics, retryWithToleranceOperator, time, statusBackingStore);
        this.workerConfig = workerConfig;
        this.task = task;
        this.configState = configState;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
        this.headerConverter = headerConverter;
        this.transformationChain = transformationChain;
        this.messageBatch = new ArrayList<SinkRecord>();
        this.lastCommittedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.currentOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.origOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.pausedForRedelivery = false;
        this.rebalanceException = null;
        this.nextCommit = time.milliseconds() + workerConfig.getLong("offset.flush.interval.ms");
        this.committing = false;
        this.commitSeqno = 0;
        this.commitStarted = -1L;
        this.commitFailures = 0;
        this.sinkTaskMetricsGroup = new SinkTaskMetricsGroup(id, connectMetrics);
        this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(this.commitSeqno);
        this.consumer = consumer;
        this.isTopicTrackingEnabled = workerConfig.getBoolean("topic.tracking.enable");
        this.taskStopped = false;
        this.workerErrantRecordReporter = workerErrantRecordReporter;
        this.errorReportersSupplier = errorReportersSupplier;
    }

    @Override
    public void initialize(TaskConfig taskConfig) {
        try {
            this.taskConfig = taskConfig.originalsStrings();
            this.context = new WorkerSinkTaskContext(this.consumer, this, this.configState);
        }
        catch (Throwable t) {
            log.error("{} Task failed initialization and will not be started.", (Object)this, (Object)t);
            this.onFailure(t);
        }
    }

    @Override
    public void stop() {
        super.stop();
        this.consumer.wakeup();
    }

    @Override
    protected void close() {
        try {
            this.task.stop();
        }
        catch (Throwable t) {
            log.warn("Could not stop task", t);
        }
        this.taskStopped = true;
        Utils.closeQuietly(this.consumer, (String)"consumer");
        Utils.closeQuietly(this.transformationChain, (String)"transformation chain");
        Utils.closeQuietly((AutoCloseable)this.retryWithToleranceOperator, (String)"retry operator");
        Utils.closeQuietly((AutoCloseable)this.headerConverter, (String)"header converter");
        this.sinkTaskMetricsGroup.recordPartitionCount(0);
    }

    @Override
    public void removeMetrics() {
        try {
            this.sinkTaskMetricsGroup.close();
        }
        finally {
            super.removeMetrics();
        }
    }

    @Override
    public void transitionTo(TargetState state) {
        super.transitionTo(state);
        this.consumer.wakeup();
    }

    @Override
    public void execute() {
        ImpersonationUtil.maybeRunImpersonated(this.workerConfig, this.taskConfig, () -> {
            this.executeInternal();
            return null;
        });
    }

    public void executeInternal() {
        log.info("{} Executing sink task", (Object)this);
        try (Utils.UncheckedCloseable suppressible = this::closeAllPartitions;){
            while (!this.isStopping()) {
                this.iteration();
            }
        }
        catch (WakeupException e) {
            log.trace("Consumer woken up during initial offset commit attempt, but succeeded during a later attempt");
        }
    }

    protected void iteration() {
        block6: {
            long offsetCommitIntervalMs = this.workerConfig.getLong("offset.flush.interval.ms");
            try {
                long now = this.time.milliseconds();
                if (!this.committing && (this.context.isCommitRequested() || now >= this.nextCommit)) {
                    this.commitOffsets(now, false);
                    this.nextCommit = now + offsetCommitIntervalMs;
                    this.context.clearCommitRequest();
                }
                long commitTimeoutMs = this.commitStarted + this.workerConfig.getLong("offset.flush.timeout.ms");
                if (this.committing && now >= commitTimeoutMs) {
                    log.warn("{} Commit of offsets timed out", (Object)this);
                    ++this.commitFailures;
                    this.committing = false;
                }
                long timeoutMs = Math.max(this.nextCommit - now, 0L);
                this.poll(timeoutMs);
            }
            catch (WakeupException we) {
                log.trace("{} Consumer woken up", (Object)this);
                if (this.isStopping()) {
                    return;
                }
                if (this.shouldPause()) {
                    this.pauseAll();
                    this.onPause();
                    this.context.requestCommit();
                }
                if (this.pausedForRedelivery) break block6;
                this.resumeAll();
                this.onResume();
            }
        }
    }

    private void onCommitCompleted(Throwable error, long seqno, Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
        if ((long)this.commitSeqno != seqno) {
            log.debug("{} Received out of order commit callback for sequence number {}, but most recent sequence number is {}", new Object[]{this, seqno, this.commitSeqno});
            this.sinkTaskMetricsGroup.recordOffsetCommitSkip();
        } else {
            long durationMillis = this.time.milliseconds() - this.commitStarted;
            if (error != null) {
                log.error("{} Commit of offsets threw an unexpected exception for sequence number {}: {}", new Object[]{this, seqno, committedOffsets, error});
                ++this.commitFailures;
                this.recordCommitFailure(durationMillis, error);
            } else {
                log.debug("{} Finished offset commit successfully in {} ms for sequence number {}: {}", new Object[]{this, durationMillis, seqno, committedOffsets});
                if (committedOffsets != null) {
                    log.trace("{} Adding to last committed offsets: {}", (Object)this, committedOffsets);
                    this.lastCommittedOffsets.putAll(committedOffsets);
                    log.debug("{} Last committed offsets are now {}", (Object)this, committedOffsets);
                    this.sinkTaskMetricsGroup.recordCommittedOffsets(committedOffsets);
                }
                this.commitFailures = 0;
                this.recordCommitSuccess(durationMillis);
            }
            this.committing = false;
        }
    }

    public int commitFailures() {
        return this.commitFailures;
    }

    @Override
    public void initializeAndStart() {
        ImpersonationUtil.maybeRunImpersonated(this.workerConfig, this.taskConfig, () -> {
            this.initializeAndStartInternal();
            return null;
        });
    }

    protected void initializeAndStartInternal() {
        SinkConnectorConfig.validate(this.taskConfig);
        this.retryWithToleranceOperator.reporters(this.errorReportersSupplier.get());
        if (SinkConnectorConfig.hasTopicsConfig(this.taskConfig)) {
            List<String> topics = SinkConnectorConfig.parseTopicsList(this.taskConfig);
            this.consumer.subscribe(topics, (ConsumerRebalanceListener)new HandleRebalance());
            log.debug("{} Initializing and starting task for topics {}", (Object)this, (Object)Utils.join(topics, (String)", "));
        } else {
            String topicsRegexStr = this.taskConfig.get("topics.regex");
            Pattern pattern = Pattern.compile(topicsRegexStr);
            this.consumer.subscribe(pattern, (ConsumerRebalanceListener)new HandleRebalance());
            log.debug("{} Initializing and starting task for topics regex {}", (Object)this, (Object)topicsRegexStr);
        }
        this.task.initialize((SinkTaskContext)this.context);
        this.task.start(this.taskConfig);
        log.info("{} Sink task finished initialization and start", (Object)this);
    }

    protected void poll(long timeoutMs) {
        this.rewind();
        long retryTimeout = this.context.timeout();
        if (retryTimeout > 0L) {
            timeoutMs = Math.min(timeoutMs, retryTimeout);
            this.context.timeout(-1L);
        }
        log.trace("{} Polling consumer with timeout {} ms", (Object)this, (Object)timeoutMs);
        ConsumerRecords<byte[], byte[]> msgs = this.pollConsumer(timeoutMs);
        assert (this.messageBatch.isEmpty() || msgs.isEmpty());
        log.trace("{} Polling returned {} messages", (Object)this, (Object)msgs.count());
        this.convertMessages(msgs);
        this.deliverMessages();
    }

    boolean isCommitting() {
        return this.committing;
    }

    private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) {
        log.debug("{} Committing offsets synchronously using sequence number {}: {}", new Object[]{this, seqno, offsets});
        try {
            this.consumer.commitSync(offsets);
            this.onCommitCompleted(null, seqno, offsets);
        }
        catch (WakeupException e) {
            this.doCommitSync(offsets, seqno);
            throw e;
        }
        catch (KafkaException e) {
            this.onCommitCompleted(e, seqno, offsets);
        }
    }

    private void doCommitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) {
        log.debug("{} Committing offsets asynchronously using sequence number {}: {}", new Object[]{this, seqno, offsets});
        OffsetCommitCallback cb = (tpOffsets, error) -> this.onCommitCompleted(error, seqno, tpOffsets);
        this.consumer.commitAsync(offsets, cb);
    }

    private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, int seqno) {
        if (this.isCancelled()) {
            log.debug("Skipping final offset commit as task has been cancelled");
            return;
        }
        if (closing) {
            this.doCommitSync(offsets, seqno);
        } else {
            this.doCommitAsync(offsets, seqno);
        }
    }

    private void commitOffsets(long now, boolean closing) {
        this.commitOffsets(now, closing, this.consumer.assignment());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void commitOffsets(long now, boolean closing, Collection<TopicPartition> topicPartitions) {
        Map taskProvidedOffsets;
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
        log.trace("Committing offsets for partitions {}", topicPartitions);
        if (this.workerErrantRecordReporter != null) {
            log.trace("Awaiting reported errors to be completed");
            this.workerErrantRecordReporter.awaitFutures(topicPartitions);
            log.trace("Completed reported errors");
        }
        if ((offsetsToCommit = this.currentOffsets.entrySet().stream().filter(e -> topicPartitions.contains(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).isEmpty()) {
            return;
        }
        this.committing = true;
        ++this.commitSeqno;
        this.commitStarted = now;
        this.sinkTaskMetricsGroup.recordOffsetSequenceNumber(this.commitSeqno);
        Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsetsForPartitions = this.lastCommittedOffsets.entrySet().stream().filter(e -> offsetsToCommit.containsKey(e.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        try {
            log.trace("{} Calling task.preCommit with current offsets: {}", (Object)this, offsetsToCommit);
            taskProvidedOffsets = this.task.preCommit(new HashMap<TopicPartition, OffsetAndMetadata>(offsetsToCommit));
        }
        catch (Throwable t) {
            if (closing) {
                log.warn("{} Offset commit failed during close", (Object)this);
            } else {
                log.error("{} Offset commit failed, rewinding to last committed offsets", (Object)this, (Object)t);
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsetsForPartitions.entrySet()) {
                    log.debug("{} Rewinding topic partition {} to offset {}", new Object[]{this, entry.getKey(), entry.getValue().offset()});
                    this.consumer.seek(entry.getKey(), entry.getValue().offset());
                }
                this.currentOffsets.putAll(lastCommittedOffsetsForPartitions);
            }
            this.onCommitCompleted(t, this.commitSeqno, null);
            return;
        }
        finally {
            if (closing) {
                log.trace("{} Closing the task before committing the offsets: {}", (Object)this, offsetsToCommit);
                this.task.close(topicPartitions);
            }
        }
        if (taskProvidedOffsets.isEmpty()) {
            log.debug("{} Skipping offset commit, task opted-out by returning no offsets from preCommit", (Object)this);
            this.onCommitCompleted(null, this.commitSeqno, null);
            return;
        }
        Set allAssignedTopicPartitions = this.consumer.assignment();
        HashMap<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(lastCommittedOffsetsForPartitions);
        for (Map.Entry taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
            TopicPartition partition = (TopicPartition)taskProvidedOffsetEntry.getKey();
            OffsetAndMetadata taskProvidedOffset = (OffsetAndMetadata)taskProvidedOffsetEntry.getValue();
            if (committableOffsets.containsKey(partition)) {
                long currentOffset;
                long taskOffset = taskProvidedOffset.offset();
                if (taskOffset <= (currentOffset = offsetsToCommit.get(partition).offset())) {
                    committableOffsets.put(partition, taskProvidedOffset);
                    continue;
                }
                log.warn("{} Ignoring invalid task provided offset {}/{} -- not yet consumed, taskOffset={} currentOffset={}", new Object[]{this, partition, taskProvidedOffset, taskOffset, currentOffset});
                continue;
            }
            if (!allAssignedTopicPartitions.contains(partition)) {
                log.warn("{} Ignoring invalid task provided offset {}/{} -- partition not assigned, assignment={}", new Object[]{this, partition, taskProvidedOffset, allAssignedTopicPartitions});
                continue;
            }
            log.debug("{} Ignoring task provided offset {}/{} -- partition not requested, requested={}", new Object[]{this, partition, taskProvidedOffset, committableOffsets.keySet()});
        }
        if (committableOffsets.equals(lastCommittedOffsetsForPartitions)) {
            log.debug("{} Skipping offset commit, no change since last commit", (Object)this);
            this.onCommitCompleted(null, this.commitSeqno, null);
            return;
        }
        this.doCommit(committableOffsets, closing, this.commitSeqno);
    }

    public String toString() {
        return "WorkerSinkTask{id=" + this.id + '}';
    }

    private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
        ConsumerRecords msgs = this.consumer.poll(Duration.ofMillis(timeoutMs));
        if (this.rebalanceException != null) {
            RuntimeException e = this.rebalanceException;
            this.rebalanceException = null;
            throw e;
        }
        this.sinkTaskMetricsGroup.recordRead(msgs.count());
        return msgs;
    }

    private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
        for (ConsumerRecord msg : msgs) {
            log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}", new Object[]{this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp()});
            this.retryWithToleranceOperator.consumerRecord((ConsumerRecord<byte[], byte[]>)msg);
            SinkRecord transRecord = this.convertAndTransformRecord((ConsumerRecord<byte[], byte[]>)msg);
            this.origOffsets.put(new TopicPartition(msg.topic(), msg.partition()), new OffsetAndMetadata(msg.offset() + 1L));
            if (transRecord != null) {
                this.messageBatch.add(transRecord);
                continue;
            }
            log.trace("{} Converters and transformations returned null, possibly because of too many retries, so dropping record in topic '{}' partition {} at offset {}", new Object[]{this, msg.topic(), msg.partition(), msg.offset()});
        }
        this.sinkTaskMetricsGroup.recordConsumedOffsets(this.origOffsets);
    }

    private SinkRecord convertAndTransformRecord(ConsumerRecord<byte[], byte[]> msg) {
        SinkRecord transformedRecord;
        SchemaAndValue keyAndSchema = (SchemaAndValue)this.retryWithToleranceOperator.execute(() -> this.keyConverter.toConnectData(msg.topic(), msg.headers(), (byte[])msg.key()), Stage.KEY_CONVERTER, this.keyConverter.getClass());
        SchemaAndValue valueAndSchema = (SchemaAndValue)this.retryWithToleranceOperator.execute(() -> this.valueConverter.toConnectData(msg.topic(), msg.headers(), (byte[])msg.value()), Stage.VALUE_CONVERTER, this.valueConverter.getClass());
        org.apache.kafka.connect.header.Headers headers = (org.apache.kafka.connect.header.Headers)this.retryWithToleranceOperator.execute(() -> this.convertHeadersFor(msg), Stage.HEADER_CONVERTER, this.headerConverter.getClass());
        if (this.retryWithToleranceOperator.failed()) {
            return null;
        }
        Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
        SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), timestamp, msg.timestampType(), (Iterable)headers);
        log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}", new Object[]{this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value()});
        if (this.isTopicTrackingEnabled) {
            this.recordActiveTopic(origRecord.topic());
        }
        if ((transformedRecord = this.transformationChain.apply(origRecord)) == null) {
            return null;
        }
        return new InternalSinkRecord(msg, transformedRecord);
    }

    private org.apache.kafka.connect.header.Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
        ConnectHeaders result = new ConnectHeaders();
        Headers recordHeaders = record.headers();
        if (recordHeaders != null) {
            String topic = record.topic();
            for (Header recordHeader : recordHeaders) {
                SchemaAndValue schemaAndValue = this.headerConverter.toConnectHeader(topic, recordHeader.key(), recordHeader.value());
                result.add(recordHeader.key(), schemaAndValue);
            }
        }
        return result;
    }

    protected WorkerErrantRecordReporter workerErrantRecordReporter() {
        return this.workerErrantRecordReporter;
    }

    private void resumeAll() {
        for (TopicPartition tp : this.consumer.assignment()) {
            if (this.context.pausedPartitions().contains(tp)) continue;
            this.consumer.resume(Collections.singleton(tp));
        }
    }

    private void pauseAll() {
        this.consumer.pause((Collection)this.consumer.assignment());
    }

    private void deliverMessages() {
        try {
            log.trace("{} Delivering batch of {} messages to task", (Object)this, (Object)this.messageBatch.size());
            long start = this.time.milliseconds();
            this.task.put(new ArrayList<SinkRecord>(this.messageBatch));
            if (this.retryWithToleranceOperator.failed() && !this.retryWithToleranceOperator.withinToleranceLimits()) {
                throw new ConnectException("Tolerance exceeded in error handler", this.retryWithToleranceOperator.error());
            }
            this.recordBatch(this.messageBatch.size());
            this.sinkTaskMetricsGroup.recordPut(this.time.milliseconds() - start);
            this.currentOffsets.putAll(this.origOffsets);
            this.origOffsets.clear();
            this.messageBatch.clear();
            if (this.pausedForRedelivery) {
                if (!this.shouldPause()) {
                    this.resumeAll();
                }
                this.pausedForRedelivery = false;
            }
        }
        catch (RetriableException e) {
            log.error("{} RetriableException from SinkTask:", (Object)this, (Object)e);
            if (!this.pausedForRedelivery) {
                this.pausedForRedelivery = true;
                this.pauseAll();
            }
        }
        catch (Throwable t) {
            log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: {}", new Object[]{this, t.getMessage(), t});
            throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t);
        }
    }

    private void rewind() {
        Map<TopicPartition, Long> offsets = this.context.offsets();
        if (offsets.isEmpty()) {
            return;
        }
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            Long offset = entry.getValue();
            if (offset != null) {
                log.trace("{} Rewind {} to offset {}", new Object[]{this, tp, offset});
                this.consumer.seek(tp, offset.longValue());
                this.lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset.longValue()));
                this.currentOffsets.put(tp, new OffsetAndMetadata(offset.longValue()));
                continue;
            }
            log.warn("{} Cannot rewind {} to null offset", (Object)this, (Object)tp);
        }
        this.context.clearOffsets();
    }

    private void openPartitions(Collection<TopicPartition> partitions) {
        this.task.open(partitions);
    }

    private void closeAllPartitions() {
        this.closePartitions(this.currentOffsets.keySet(), false);
    }

    private void closePartitions(Collection<TopicPartition> topicPartitions, boolean lost) {
        if (!lost) {
            this.commitOffsets(this.time.milliseconds(), true, topicPartitions);
        } else {
            log.trace("{} Closing the task as partitions have been lost: {}", (Object)this, topicPartitions);
            this.task.close(topicPartitions);
            if (this.workerErrantRecordReporter != null) {
                log.trace("Cancelling reported errors for {}", topicPartitions);
                this.workerErrantRecordReporter.cancelFutures(topicPartitions);
                log.trace("Cancelled all reported errors for {}", topicPartitions);
            }
            this.origOffsets.keySet().removeAll(topicPartitions);
            this.currentOffsets.keySet().removeAll(topicPartitions);
        }
        this.lastCommittedOffsets.keySet().removeAll(topicPartitions);
    }

    private void updatePartitionCount() {
        this.sinkTaskMetricsGroup.recordPartitionCount(this.consumer.assignment().size());
    }

    @Override
    protected void recordBatch(int size) {
        super.recordBatch(size);
        this.sinkTaskMetricsGroup.recordSend(size);
    }

    @Override
    protected void recordCommitSuccess(long duration) {
        super.recordCommitSuccess(duration);
        this.sinkTaskMetricsGroup.recordOffsetCommitSuccess();
    }

    SinkTaskMetricsGroup sinkTaskMetricsGroup() {
        return this.sinkTaskMetricsGroup;
    }

    long getNextCommit() {
        return this.nextCommit;
    }

    static class SinkTaskMetricsGroup {
        private final ConnectorTaskId id;
        private final ConnectMetrics metrics;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor sinkRecordRead;
        private final Sensor sinkRecordSend;
        private final Sensor partitionCount;
        private final Sensor offsetSeqNum;
        private final Sensor offsetCompletion;
        private final Sensor offsetCompletionSkip;
        private final Sensor putBatchTime;
        private final Sensor sinkRecordActiveCount;
        private long activeRecords;
        private Map<TopicPartition, OffsetAndMetadata> consumedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        private Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();

        public SinkTaskMetricsGroup(ConnectorTaskId id, ConnectMetrics connectMetrics) {
            this.metrics = connectMetrics;
            this.id = id;
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.sinkTaskGroupName(), registry.connectorTagName(), id.connector(), registry.taskTagName(), Integer.toString(id.task()));
            this.metricGroup.close();
            this.sinkRecordRead = this.metricGroup.sensor("sink-record-read");
            this.sinkRecordRead.add(this.metricGroup.metricName(registry.sinkRecordReadRate), (MeasurableStat)new Rate());
            this.sinkRecordRead.add(this.metricGroup.metricName(registry.sinkRecordReadTotal), (MeasurableStat)new CumulativeSum());
            this.sinkRecordSend = this.metricGroup.sensor("sink-record-send");
            this.sinkRecordSend.add(this.metricGroup.metricName(registry.sinkRecordSendRate), (MeasurableStat)new Rate());
            this.sinkRecordSend.add(this.metricGroup.metricName(registry.sinkRecordSendTotal), (MeasurableStat)new CumulativeSum());
            this.sinkRecordActiveCount = this.metricGroup.sensor("sink-record-active-count");
            this.sinkRecordActiveCount.add(this.metricGroup.metricName(registry.sinkRecordActiveCount), (MeasurableStat)new Value());
            this.sinkRecordActiveCount.add(this.metricGroup.metricName(registry.sinkRecordActiveCountMax), (MeasurableStat)new Max());
            this.sinkRecordActiveCount.add(this.metricGroup.metricName(registry.sinkRecordActiveCountAvg), (MeasurableStat)new Avg());
            this.partitionCount = this.metricGroup.sensor("partition-count");
            this.partitionCount.add(this.metricGroup.metricName(registry.sinkRecordPartitionCount), (MeasurableStat)new Value());
            this.offsetSeqNum = this.metricGroup.sensor("offset-seq-number");
            this.offsetSeqNum.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitSeqNum), (MeasurableStat)new Value());
            this.offsetCompletion = this.metricGroup.sensor("offset-commit-completion");
            this.offsetCompletion.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionRate), (MeasurableStat)new Rate());
            this.offsetCompletion.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitCompletionTotal), (MeasurableStat)new CumulativeSum());
            this.offsetCompletionSkip = this.metricGroup.sensor("offset-commit-completion-skip");
            this.offsetCompletionSkip.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitSkipRate), (MeasurableStat)new Rate());
            this.offsetCompletionSkip.add(this.metricGroup.metricName(registry.sinkRecordOffsetCommitSkipTotal), (MeasurableStat)new CumulativeSum());
            this.putBatchTime = this.metricGroup.sensor("put-batch-time");
            this.putBatchTime.add(this.metricGroup.metricName(registry.sinkRecordPutBatchTimeMax), (MeasurableStat)new Max());
            this.putBatchTime.add(this.metricGroup.metricName(registry.sinkRecordPutBatchTimeAvg), (MeasurableStat)new Avg());
        }

        void computeSinkRecordLag() {
            Map<TopicPartition, OffsetAndMetadata> consumed = this.consumedOffsets;
            Map<TopicPartition, OffsetAndMetadata> committed = this.committedOffsets;
            this.activeRecords = 0L;
            for (Map.Entry<TopicPartition, OffsetAndMetadata> committedOffsetEntry : committed.entrySet()) {
                TopicPartition partition = committedOffsetEntry.getKey();
                OffsetAndMetadata consumedOffsetMeta = consumed.get(partition);
                if (consumedOffsetMeta == null) continue;
                OffsetAndMetadata committedOffsetMeta = committedOffsetEntry.getValue();
                long consumedOffset = consumedOffsetMeta.offset();
                long committedOffset = committedOffsetMeta.offset();
                long diff = consumedOffset - committedOffset;
                this.activeRecords += Math.max(diff, 0L);
            }
            this.sinkRecordActiveCount.record((double)this.activeRecords);
        }

        void close() {
            this.metricGroup.close();
        }

        void recordRead(int batchSize) {
            this.sinkRecordRead.record((double)batchSize);
        }

        void recordSend(int batchSize) {
            this.sinkRecordSend.record((double)batchSize);
        }

        void recordPut(long duration) {
            this.putBatchTime.record((double)duration);
        }

        void recordPartitionCount(int assignedPartitionCount) {
            this.partitionCount.record((double)assignedPartitionCount);
        }

        void recordOffsetSequenceNumber(int seqNum) {
            this.offsetSeqNum.record((double)seqNum);
        }

        void recordConsumedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.consumedOffsets.putAll(offsets);
            this.computeSinkRecordLag();
        }

        void recordCommittedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.committedOffsets = offsets;
            this.computeSinkRecordLag();
        }

        void assignedOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.consumedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>(offsets);
            this.committedOffsets = offsets;
            this.computeSinkRecordLag();
        }

        void clearOffsets(Collection<TopicPartition> topicPartitions) {
            this.consumedOffsets.keySet().removeAll(topicPartitions);
            this.committedOffsets.keySet().removeAll(topicPartitions);
            this.computeSinkRecordLag();
        }

        void recordOffsetCommitSuccess() {
            this.offsetCompletion.record(1.0);
        }

        void recordOffsetCommitSkip() {
            this.offsetCompletionSkip.record(1.0);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    private class HandleRebalance
    implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            log.debug("{} Partitions assigned {}", (Object)WorkerSinkTask.this, partitions);
            for (TopicPartition tp : partitions) {
                long pos = WorkerSinkTask.this.consumer.position(tp);
                WorkerSinkTask.this.lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
                WorkerSinkTask.this.currentOffsets.put(tp, new OffsetAndMetadata(pos));
                log.debug("{} Assigned topic partition {} with offset {}", new Object[]{WorkerSinkTask.this, tp, pos});
            }
            WorkerSinkTask.this.sinkTaskMetricsGroup.assignedOffsets(WorkerSinkTask.this.currentOffsets);
            boolean wasPausedForRedelivery = WorkerSinkTask.this.pausedForRedelivery;
            WorkerSinkTask.this.pausedForRedelivery = wasPausedForRedelivery && !WorkerSinkTask.this.messageBatch.isEmpty();
            if (WorkerSinkTask.this.pausedForRedelivery) {
                WorkerSinkTask.this.pauseAll();
            } else {
                if (wasPausedForRedelivery) {
                    WorkerSinkTask.this.resumeAll();
                }
                WorkerSinkTask.this.context.pausedPartitions().retainAll(WorkerSinkTask.this.consumer.assignment());
                if (WorkerSinkTask.this.shouldPause()) {
                    WorkerSinkTask.this.pauseAll();
                } else if (!WorkerSinkTask.this.context.pausedPartitions().isEmpty()) {
                    WorkerSinkTask.this.consumer.pause(WorkerSinkTask.this.context.pausedPartitions());
                }
            }
            WorkerSinkTask.this.updatePartitionCount();
            if (partitions.isEmpty()) {
                return;
            }
            if (WorkerSinkTask.this.rebalanceException == null || WorkerSinkTask.this.rebalanceException instanceof WakeupException) {
                try {
                    WorkerSinkTask.this.openPartitions(partitions);
                    WorkerSinkTask.this.rewind();
                }
                catch (RuntimeException e) {
                    WorkerSinkTask.this.rebalanceException = e;
                }
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.onPartitionsRemoved(partitions, false);
        }

        public void onPartitionsLost(Collection<TopicPartition> partitions) {
            this.onPartitionsRemoved(partitions, true);
        }

        private void onPartitionsRemoved(Collection<TopicPartition> partitions, boolean lost) {
            if (WorkerSinkTask.this.taskStopped) {
                log.trace("Skipping partition revocation callback as task has already been stopped");
                return;
            }
            log.debug("{} Partitions {}: {}", new Object[]{WorkerSinkTask.this, lost ? "lost" : "revoked", partitions});
            if (partitions.isEmpty()) {
                return;
            }
            try {
                WorkerSinkTask.this.closePartitions(partitions, lost);
                WorkerSinkTask.this.sinkTaskMetricsGroup.clearOffsets(partitions);
            }
            catch (RuntimeException e) {
                WorkerSinkTask.this.rebalanceException = e;
            }
            WorkerSinkTask.this.messageBatch.removeIf(record -> partitions.contains(new TopicPartition(record.topic(), record.kafkaPartition().intValue())));
        }
    }
}

