package org.apache.kafka.connect.integration;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.TestSinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/integration/MonitorableSinkConnector.class */
public class MonitorableSinkConnector extends TestSinkConnector {
    private static final Logger log = LoggerFactory.getLogger(MonitorableSinkConnector.class);
    private String connectorName;
    private Map<String, String> commonConfigs;
    private ConnectorHandle connectorHandle;

    /* loaded from: input_file:org/apache/kafka/connect/integration/MonitorableSinkConnector$MonitorableSinkTask.class */
    public static class MonitorableSinkTask extends SinkTask {
        private String connectorName;
        private String taskId;
        TaskHandle taskHandle;
        Map<TopicPartition, Integer> committedOffsets = new HashMap();
        Map<String, Map<Integer, TopicPartition>> cachedTopicPartitions = new HashMap();

        public String version() {
            return "unknown";
        }

        public void start(Map<String, String> map) {
            this.taskId = map.get("task.id");
            this.connectorName = map.get("connector.name");
            this.taskHandle = RuntimeHandles.get().connectorHandle(this.connectorName).taskHandle(this.taskId);
            MonitorableSinkConnector.log.debug("Starting task {}", this.taskId);
            this.taskHandle.recordTaskStart();
        }

        public void open(Collection<TopicPartition> collection) {
            MonitorableSinkConnector.log.debug("Opening partitions {}", collection);
            this.taskHandle.partitionsAssigned(collection);
        }

        public void close(Collection<TopicPartition> collection) {
            MonitorableSinkConnector.log.debug("Closing partitions {}", collection);
            this.taskHandle.partitionsRevoked(collection);
            Map<TopicPartition, Integer> map = this.committedOffsets;
            Objects.requireNonNull(map);
            collection.forEach((v1) -> {
                r1.remove(v1);
            });
        }

        public void put(Collection<SinkRecord> collection) {
            for (SinkRecord sinkRecord : collection) {
                this.taskHandle.record(sinkRecord);
                TopicPartition computeIfAbsent = this.cachedTopicPartitions.computeIfAbsent(sinkRecord.topic(), str -> {
                    return new HashMap();
                }).computeIfAbsent(sinkRecord.kafkaPartition(), num -> {
                    return new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
                });
                this.committedOffsets.put(computeIfAbsent, Integer.valueOf(this.committedOffsets.getOrDefault(computeIfAbsent, 0).intValue() + 1));
                MonitorableSinkConnector.log.trace("Task {} obtained record (key='{}' value='{}')", new Object[]{this.taskId, sinkRecord.key(), sinkRecord.value()});
            }
        }

        public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> map) {
            this.taskHandle.partitionsCommitted(map.keySet());
            map.forEach((topicPartition, offsetAndMetadata) -> {
                int intValue = this.committedOffsets.getOrDefault(topicPartition, 0).intValue();
                if (intValue != 0) {
                    this.taskHandle.commit(intValue);
                    MonitorableSinkConnector.log.debug("Forwarding to framework request to commit {} records for {}", Integer.valueOf(intValue), topicPartition);
                    this.committedOffsets.put(topicPartition, 0);
                }
            });
            return map;
        }

        public void stop() {
            MonitorableSinkConnector.log.info("Stopped {} task {}", getClass().getSimpleName(), this.taskId);
            this.taskHandle.recordTaskStop();
        }
    }

    @Override // org.apache.kafka.connect.runtime.TestSinkConnector
    public void start(Map<String, String> map) {
        this.connectorHandle = RuntimeHandles.get().connectorHandle(map.get("name"));
        this.connectorName = map.get("name");
        this.commonConfigs = map;
        log.info("Starting connector {}", map.get("name"));
        this.connectorHandle.recordConnectorStart();
    }

    @Override // org.apache.kafka.connect.runtime.TestSinkConnector
    public Class<? extends Task> taskClass() {
        return MonitorableSinkTask.class;
    }

    @Override // org.apache.kafka.connect.runtime.TestSinkConnector
    public List<Map<String, String>> taskConfigs(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            HashMap hashMap = new HashMap(this.commonConfigs);
            hashMap.put("connector.name", this.connectorName);
            hashMap.put("task.id", this.connectorName + "-" + i2);
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    @Override // org.apache.kafka.connect.runtime.TestSinkConnector
    public void stop() {
        log.info("Stopped {} connector {}", getClass().getSimpleName(), this.connectorName);
        this.connectorHandle.recordConnectorStop();
    }

    @Override // org.apache.kafka.connect.runtime.TestSinkConnector
    public ConfigDef config() {
        return new ConfigDef();
    }
}
