package org.apache.kafka.connect.runtime.standalone;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerder.class */
public class StandaloneHerder implements Herder {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) StandaloneHerder.class);
    private final Worker worker;
    private HashMap<String, ConnectorState> connectors = new HashMap<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/standalone/StandaloneHerder$ConnectorState.class */
    public static class ConnectorState {
        public String name;
        public Map<String, String> configOriginals;
        public ConnectorConfig config;
        List<Map<String, String>> taskConfigs = new ArrayList();

        public ConnectorState(Map<String, String> map, ConnectorConfig connectorConfig) {
            this.name = connectorConfig.getString("name");
            this.configOriginals = map;
            this.config = connectorConfig;
        }
    }

    public StandaloneHerder(Worker worker) {
        this.worker = worker;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void start() {
        log.info("Herder starting");
        log.info("Herder started");
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void stop() {
        log.info("Herder stopping");
        Iterator it = new HashSet(this.connectors.keySet()).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            removeConnectorTasks(str);
            try {
                this.worker.stopConnector(str);
            } catch (ConnectException e) {
                log.error("Error shutting down connector {}: ", str, e);
            }
        }
        this.connectors.clear();
        log.info("Herder stopped");
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void connectors(Callback<Collection<String>> callback) {
        callback.onCompletion(null, new ArrayList(this.connectors.keySet()));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void connectorInfo(String str, Callback<ConnectorInfo> callback) {
        ConnectorState connectorState = this.connectors.get(str);
        if (connectorState == null) {
            callback.onCompletion(new NotFoundException("Connector " + str + " not found"), null);
        } else {
            callback.onCompletion(null, createConnectorInfo(connectorState));
        }
    }

    private ConnectorInfo createConnectorInfo(ConnectorState connectorState) {
        if (connectorState == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < connectorState.taskConfigs.size(); i++) {
            arrayList.add(new ConnectorTaskId(connectorState.name, i));
        }
        return new ConnectorInfo(connectorState.name, connectorState.configOriginals, arrayList);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectorConfig(String str, final Callback<Map<String, String>> callback) {
        connectorInfo(str, new Callback<ConnectorInfo>() { // from class: org.apache.kafka.connect.runtime.standalone.StandaloneHerder.1
            @Override // org.apache.kafka.connect.util.Callback
            public void onCompletion(Throwable th, ConnectorInfo connectorInfo) {
                if (th != null) {
                    callback.onCompletion(th, null);
                } else {
                    callback.onCompletion(null, connectorInfo.config());
                }
            }
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void putConnectorConfig(String str, Map<String, String> map, boolean z, Callback<Herder.Created<ConnectorInfo>> callback) {
        try {
            boolean z2 = false;
            if (this.connectors.containsKey(str)) {
                if (!z) {
                    callback.onCompletion(new AlreadyExistsException("Connector " + str + " already exists"), null);
                    return;
                }
                if (map == null) {
                    removeConnectorTasks(str);
                }
                this.worker.stopConnector(str);
                if (map == null) {
                    this.connectors.remove(str);
                }
            } else {
                if (map == null) {
                    callback.onCompletion(new NotFoundException("Connector " + str + " not found", null), null);
                    return;
                }
                z2 = true;
            }
            if (map != null) {
                startConnector(map);
                updateConnectorTasks(str);
            }
            if (map != null) {
                callback.onCompletion(null, new Herder.Created<>(z2, createConnectorInfo(this.connectors.get(str))));
            } else {
                callback.onCompletion(null, new Herder.Created<>(false, null));
            }
        } catch (ConnectException e) {
            callback.onCompletion(e, null);
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void requestTaskReconfiguration(String str) {
        if (this.worker.connectorNames().contains(str)) {
            updateConnectorTasks(str);
        } else {
            log.error("Task that requested reconfiguration does not exist: {}", str);
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public synchronized void taskConfigs(String str, Callback<List<TaskInfo>> callback) {
        ConnectorState connectorState = this.connectors.get(str);
        if (connectorState == null) {
            callback.onCompletion(new NotFoundException("Connector " + str + " not found", null), null);
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < connectorState.taskConfigs.size(); i++) {
            arrayList.add(new TaskInfo(new ConnectorTaskId(str, i), connectorState.taskConfigs.get(i)));
        }
        callback.onCompletion(null, arrayList);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void putTaskConfigs(String str, List<Map<String, String>> list, Callback<Void> callback) {
        throw new UnsupportedOperationException("Kafka Connect in standalone mode does not support externally setting task configurations.");
    }

    private String startConnector(Map<String, String> map) {
        ConnectorConfig connectorConfig = new ConnectorConfig(map);
        String string = connectorConfig.getString("name");
        ConnectorState connectorState = this.connectors.get(string);
        this.worker.addConnector(connectorConfig, new HerderConnectorContext(this, string));
        if (connectorState == null) {
            this.connectors.put(string, new ConnectorState(map, connectorConfig));
        } else {
            connectorState.configOriginals = map;
            connectorState.config = connectorConfig;
        }
        return string;
    }

    private List<Map<String, String>> recomputeTaskConfigs(String str) {
        ConnectorState connectorState = this.connectors.get(str);
        return this.worker.connectorTaskConfigs(str, connectorState.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG).intValue(), connectorState.config.getList("topics"));
    }

    private void createConnectorTasks(String str) {
        int i = 0;
        for (Map<String, String> map : this.connectors.get(str).taskConfigs) {
            ConnectorTaskId connectorTaskId = new ConnectorTaskId(str, i);
            try {
                this.worker.addTask(connectorTaskId, new TaskConfig(map));
            } catch (Throwable th) {
                log.error("Failed to add task {}: ", connectorTaskId, th);
            }
            i++;
        }
    }

    private void removeConnectorTasks(String str) {
        ConnectorState connectorState = this.connectors.get(str);
        for (int i = 0; i < connectorState.taskConfigs.size(); i++) {
            ConnectorTaskId connectorTaskId = new ConnectorTaskId(str, i);
            try {
                this.worker.stopTask(connectorTaskId);
            } catch (ConnectException e) {
                log.error("Failed to stop task {}: ", connectorTaskId, e);
            }
        }
        connectorState.taskConfigs = new ArrayList();
    }

    private void updateConnectorTasks(String str) {
        List<Map<String, String>> recomputeTaskConfigs = recomputeTaskConfigs(str);
        ConnectorState connectorState = this.connectors.get(str);
        if (recomputeTaskConfigs.equals(connectorState.taskConfigs)) {
            return;
        }
        removeConnectorTasks(str);
        connectorState.taskConfigs = recomputeTaskConfigs;
        createConnectorTasks(str);
    }
}
