package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/Worker.class */
public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private Time time;
    private WorkerConfig config;
    private Converter keyConverter;
    private Converter valueConverter;
    private Converter internalKeyConverter;
    private Converter internalValueConverter;
    private OffsetBackingStore offsetBackingStore;
    private HashMap<String, Connector> connectors;
    private HashMap<ConnectorTaskId, WorkerTask> tasks;
    private KafkaProducer<byte[], byte[]> producer;
    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;

    public Worker(WorkerConfig workerConfig, OffsetBackingStore offsetBackingStore) {
        this(new SystemTime(), workerConfig, offsetBackingStore);
    }

    public Worker(Time time, WorkerConfig workerConfig, OffsetBackingStore offsetBackingStore) {
        this.connectors = new HashMap<>();
        this.tasks = new HashMap<>();
        this.time = time;
        this.config = workerConfig;
        this.keyConverter = (Converter) workerConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
        this.keyConverter.configure(workerConfig.originalsWithPrefix("key.converter."), true);
        this.valueConverter = (Converter) workerConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
        this.valueConverter.configure(workerConfig.originalsWithPrefix("value.converter."), false);
        this.internalKeyConverter = (Converter) workerConfig.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
        this.internalKeyConverter.configure(workerConfig.originalsWithPrefix("internal.key.converter."), true);
        this.internalValueConverter = (Converter) workerConfig.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
        this.internalValueConverter.configure(workerConfig.originalsWithPrefix("internal.value.converter."), false);
        this.offsetBackingStore = offsetBackingStore;
        this.offsetBackingStore.configure(workerConfig.originals());
    }

    public void start() {
        log.info("Worker starting");
        HashMap hashMap = new HashMap();
        hashMap.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(this.config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
        hashMap.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        Integer num = Integer.MAX_VALUE;
        hashMap.put("request.timeout.ms", num.toString());
        Integer num2 = Integer.MAX_VALUE;
        hashMap.put("retries", num2.toString());
        Long l = Long.MAX_VALUE;
        hashMap.put("max.block.ms", l.toString());
        hashMap.put("acks", "all");
        hashMap.put("max.in.flight.requests.per.connection", "1");
        hashMap.putAll(this.config.originalsWithPrefix("producer."));
        this.producer = new KafkaProducer<>(hashMap);
        this.offsetBackingStore.start();
        this.sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(this.time, this.config);
        log.info("Worker started");
    }

    public void stop() {
        log.info("Worker stopping");
        long milliseconds = this.time.milliseconds() + this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        Iterator<Map.Entry<String, Connector>> it = this.connectors.entrySet().iterator();
        while (it.hasNext()) {
            Connector value = it.next().getValue();
            log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before theWorker is stopped.", value);
            try {
                value.stop();
            } catch (ConnectException e) {
                log.error("Error while shutting down connector " + value, e);
            }
        }
        Iterator<Map.Entry<ConnectorTaskId, WorkerTask>> it2 = this.tasks.entrySet().iterator();
        while (it2.hasNext()) {
            WorkerTask value2 = it2.next().getValue();
            log.warn("Shutting down task {} uncleanly; herder should have shut down tasks before the Worker is stopped.", value2);
            try {
                value2.stop();
            } catch (ConnectException e2) {
                log.error("Error while shutting down task " + value2, e2);
            }
        }
        Iterator<Map.Entry<ConnectorTaskId, WorkerTask>> it3 = this.tasks.entrySet().iterator();
        while (it3.hasNext()) {
            WorkerTask value3 = it3.next().getValue();
            log.debug("Waiting for task {} to finish shutting down", value3);
            if (!value3.awaitStop(Math.max(milliseconds - this.time.milliseconds(), 0L))) {
                log.error("Graceful shutdown of task {} failed.", value3);
            }
            value3.close();
        }
        this.sourceTaskOffsetCommitter.close(milliseconds - this.time.milliseconds());
        this.offsetBackingStore.stop();
        log.info("Worker stopped");
    }

    public WorkerConfig config() {
        return this.config;
    }

    public void addConnector(ConnectorConfig connectorConfig, ConnectorContext connectorContext) {
        String string = connectorConfig.getString(ConnectorConfig.NAME_CONFIG);
        Class cls = connectorConfig.getClass(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
        log.info("Creating connector {} of type {}", string, cls.getName());
        try {
            Class asSubclass = cls.asSubclass(Connector.class);
            if (this.connectors.containsKey(string)) {
                throw new ConnectException("Connector with name " + string + " already exists");
            }
            Connector instantiateConnector = instantiateConnector(asSubclass);
            log.info("Instantiated connector {} with version {} of type {}", new Object[]{string, instantiateConnector.version(), asSubclass.getName()});
            instantiateConnector.initialize(connectorContext);
            try {
                instantiateConnector.start(connectorConfig.originalsStrings());
                this.connectors.put(string, instantiateConnector);
                log.info("Finished creating connector {}", string);
            } catch (ConnectException e) {
                throw new ConnectException("Connector threw an exception while starting", e);
            }
        } catch (ClassCastException e2) {
            throw new ConnectException("Specified class is not a subclass of Connector: " + cls.getName());
        }
    }

    private static Connector instantiateConnector(Class<? extends Connector> cls) {
        try {
            return (Connector) Utils.newInstance(cls);
        } catch (Throwable th) {
            throw new ConnectException("Failed to create connector instance", th);
        }
    }

    public List<Map<String, String>> connectorTaskConfigs(String str, int i, List<String> list) {
        log.trace("Reconfiguring connector tasks for {}", str);
        Connector connector = this.connectors.get(str);
        if (connector == null) {
            throw new ConnectException("Connector " + str + " not found in this worker.");
        }
        ArrayList arrayList = new ArrayList();
        String name = connector.taskClass().getName();
        Iterator it = connector.taskConfigs(i).iterator();
        while (it.hasNext()) {
            HashMap hashMap = new HashMap((Map) it.next());
            hashMap.put(TaskConfig.TASK_CLASS_CONFIG, name);
            if (list != null) {
                hashMap.put(ConnectorConfig.TOPICS_CONFIG, Utils.join(list, ","));
            }
            arrayList.add(hashMap);
        }
        return arrayList;
    }

    public void stopConnector(String str) {
        log.info("Stopping connector {}", str);
        Connector connector = this.connectors.get(str);
        if (connector == null) {
            throw new ConnectException("Connector " + str + " not found in this worker.");
        }
        try {
            connector.stop();
        } catch (ConnectException e) {
            log.error("Error shutting down connector {}: ", connector, e);
        }
        this.connectors.remove(str);
        log.info("Stopped connector {}", str);
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public void addTask(ConnectorTaskId connectorTaskId, TaskConfig taskConfig) {
        WorkerTask workerSinkTask;
        log.info("Creating task {}", connectorTaskId);
        if (this.tasks.containsKey(connectorTaskId)) {
            String str = "Task already exists in this worker; the herder should not have requested that this : " + connectorTaskId;
            log.error(str);
            throw new ConnectException(str);
        }
        Class asSubclass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
        SourceTask instantiateTask = instantiateTask(asSubclass);
        log.info("Instantiated task {} with version {} of type {}", new Object[]{connectorTaskId, instantiateTask.version(), asSubclass.getName()});
        if (instantiateTask instanceof SourceTask) {
            workerSinkTask = new WorkerSourceTask(connectorTaskId, instantiateTask, this.keyConverter, this.valueConverter, this.producer, new OffsetStorageReaderImpl(this.offsetBackingStore, connectorTaskId.connector(), this.internalKeyConverter, this.internalValueConverter), new OffsetStorageWriter(this.offsetBackingStore, connectorTaskId.connector(), this.internalKeyConverter, this.internalValueConverter), this.config, this.time);
        } else {
            if (!(instantiateTask instanceof SinkTask)) {
                log.error("Tasks must be a subclass of either SourceTask or SinkTask", instantiateTask);
                throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
            }
            workerSinkTask = new WorkerSinkTask(connectorTaskId, (SinkTask) instantiateTask, this.config, this.keyConverter, this.valueConverter, this.time);
        }
        workerSinkTask.start(taskConfig.originalsStrings());
        if (instantiateTask instanceof SourceTask) {
            this.sourceTaskOffsetCommitter.schedule(connectorTaskId, (WorkerSourceTask) workerSinkTask);
        }
        this.tasks.put(connectorTaskId, workerSinkTask);
    }

    private static Task instantiateTask(Class<? extends Task> cls) {
        try {
            return (Task) Utils.newInstance(cls);
        } catch (KafkaException e) {
            throw new ConnectException("Task class not found", e);
        }
    }

    public void stopTask(ConnectorTaskId connectorTaskId) {
        log.info("Stopping task {}", connectorTaskId);
        WorkerTask task = getTask(connectorTaskId);
        if (task instanceof WorkerSourceTask) {
            this.sourceTaskOffsetCommitter.remove(connectorTaskId);
        }
        task.stop();
        if (!task.awaitStop(this.config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue())) {
            log.error("Graceful stop of task {} failed.", task);
        }
        task.close();
        this.tasks.remove(connectorTaskId);
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    private WorkerTask getTask(ConnectorTaskId connectorTaskId) {
        WorkerTask workerTask = this.tasks.get(connectorTaskId);
        if (workerTask != null) {
            return workerTask;
        }
        log.error("Task not found: " + connectorTaskId);
        throw new ConnectException("Task not found: " + connectorTaskId);
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }
}
