package org.apache.kafka.connect.runtime.distributed;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Total;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.HerderRequest;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.ConnectProtocol;
import org.apache.kafka.connect.runtime.rest.RestClient;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.SinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerder.class */
public class DistributedHerder extends AbstractHerder implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
    private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
    private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
    private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
    private static final int START_STOP_THREAD_POOL_SIZE = 8;
    private final AtomicLong requestSeqNum;
    private final Time time;
    private final HerderMetrics herderMetrics;
    private final String workerGroupId;
    private final int workerSyncTimeoutMs;
    private final long workerTasksShutdownTimeoutMs;
    private final int workerUnsyncBackoffMs;
    private final ExecutorService herderExecutor;
    private final ExecutorService forwardRequestExecutor;
    private final ExecutorService startAndStopExecutor;
    private final WorkerGroupMember member;
    private final AtomicBoolean stopping;
    private boolean rebalanceResolved;
    private ConnectProtocol.Assignment assignment;
    private boolean canReadConfigs;
    private ClusterConfigState configState;
    final NavigableSet<DistributedHerderRequest> requests;
    private Set<String> connectorConfigUpdates;
    private Set<String> connectorTargetStateChanges;
    private boolean needsReconfigRebalance;
    private volatile int generation;
    private final DistributedConfig config;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerder$ConfigUpdateListener.class */
    public class ConfigUpdateListener implements ConfigBackingStore.UpdateListener {
        public ConfigUpdateListener() {
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onConnectorConfigRemove(String str) {
            DistributedHerder.log.info("Connector {} config removed", str);
            synchronized (DistributedHerder.this) {
                if (DistributedHerder.this.configState.contains(str)) {
                    DistributedHerder.this.needsReconfigRebalance = true;
                }
                DistributedHerder.this.connectorConfigUpdates.add(str);
            }
            DistributedHerder.this.member.wakeup();
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onConnectorConfigUpdate(String str) {
            DistributedHerder.log.info("Connector {} config updated", str);
            synchronized (DistributedHerder.this) {
                if (!DistributedHerder.this.configState.contains(str)) {
                    DistributedHerder.this.needsReconfigRebalance = true;
                }
                DistributedHerder.this.connectorConfigUpdates.add(str);
            }
            DistributedHerder.this.member.wakeup();
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onTaskConfigUpdate(Collection<ConnectorTaskId> collection) {
            DistributedHerder.log.info("Tasks {} configs updated", collection);
            synchronized (DistributedHerder.this) {
                DistributedHerder.this.needsReconfigRebalance = true;
            }
            DistributedHerder.this.member.wakeup();
        }

        @Override // org.apache.kafka.connect.storage.ConfigBackingStore.UpdateListener
        public void onConnectorTargetStateChange(String str) {
            DistributedHerder.log.info("Connector {} target state change", str);
            synchronized (DistributedHerder.this) {
                DistributedHerder.this.connectorTargetStateChanges.add(str);
            }
            DistributedHerder.this.member.wakeup();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerder$DistributedHerderRequest.class */
    public class DistributedHerderRequest implements HerderRequest, Comparable<DistributedHerderRequest> {
        private final long at;
        private final long seq;
        private final Callable<Void> action;
        private final Callback<Void> callback;

        public DistributedHerderRequest(long j, long j2, Callable<Void> callable, Callback<Void> callback) {
            this.at = j;
            this.seq = j2;
            this.action = callable;
            this.callback = callback;
        }

        public Callable<Void> action() {
            return this.action;
        }

        public Callback<Void> callback() {
            return this.callback;
        }

        @Override // org.apache.kafka.connect.runtime.HerderRequest
        public void cancel() {
            DistributedHerder.this.requests.remove(this);
        }

        @Override // java.lang.Comparable
        public int compareTo(DistributedHerderRequest distributedHerderRequest) {
            int compare = Long.compare(this.at, distributedHerderRequest.at);
            return compare == 0 ? Long.compare(this.seq, distributedHerderRequest.seq) : compare;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return (obj instanceof DistributedHerderRequest) && compareTo((DistributedHerderRequest) obj) == 0;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.at), Long.valueOf(this.seq));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerder$HerderMetrics.class */
    public class HerderMetrics {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor rebalanceCompletedCounts;
        private final Sensor rebalanceTime;
        private volatile long lastRebalanceCompletedAtMillis = Long.MIN_VALUE;
        private volatile boolean rebalancing = false;
        private volatile long rebalanceStartedAtMillis = 0;

        public HerderMetrics(ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.workerRebalanceGroupName(), new String[0]);
            this.metricGroup.addValueMetric(registry.leaderName, new ConnectMetrics.LiteralSupplier<String>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public String metricValue(long j) {
                    return DistributedHerder.this.leaderUrl();
                }
            });
            this.metricGroup.addValueMetric(registry.epoch, new ConnectMetrics.LiteralSupplier<Double>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public Double metricValue(long j) {
                    return Double.valueOf(DistributedHerder.this.generation);
                }
            });
            this.metricGroup.addValueMetric(registry.rebalanceMode, new ConnectMetrics.LiteralSupplier<Double>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public Double metricValue(long j) {
                    return Double.valueOf(HerderMetrics.this.rebalancing ? 1.0d : 0.0d);
                }
            });
            this.rebalanceCompletedCounts = this.metricGroup.sensor("completed-rebalance-count");
            this.rebalanceCompletedCounts.add(this.metricGroup.metricName(registry.rebalanceCompletedTotal), new Total());
            this.rebalanceTime = this.metricGroup.sensor("rebalance-time");
            this.rebalanceTime.add(this.metricGroup.metricName(registry.rebalanceTimeMax), new Max());
            this.rebalanceTime.add(this.metricGroup.metricName(registry.rebalanceTimeAvg), new Avg());
            this.metricGroup.addValueMetric(registry.rebalanceTimeSinceLast, new ConnectMetrics.LiteralSupplier<Double>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.HerderMetrics.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.kafka.connect.runtime.ConnectMetrics.LiteralSupplier
                public Double metricValue(long j) {
                    return Double.valueOf(HerderMetrics.this.lastRebalanceCompletedAtMillis == Long.MIN_VALUE ? Double.POSITIVE_INFINITY : j - HerderMetrics.this.lastRebalanceCompletedAtMillis);
                }
            });
        }

        void close() {
            this.metricGroup.close();
        }

        void rebalanceStarted(long j) {
            this.rebalanceStartedAtMillis = j;
            this.rebalancing = true;
        }

        void rebalanceSucceeded(long j) {
            long max = Math.max(0L, j - this.rebalanceStartedAtMillis);
            this.rebalancing = false;
            this.rebalanceCompletedCounts.record(1.0d);
            this.rebalanceTime.record(max);
            this.lastRebalanceCompletedAtMillis = j;
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/DistributedHerder$RebalanceListener.class */
    public class RebalanceListener implements WorkerRebalanceListener {
        public RebalanceListener() {
        }

        @Override // org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener
        public void onAssigned(ConnectProtocol.Assignment assignment, int i) {
            DistributedHerder.log.info("Joined group and got assignment: {}", assignment);
            synchronized (DistributedHerder.this) {
                DistributedHerder.this.assignment = assignment;
                DistributedHerder.this.generation = i;
                DistributedHerder.this.rebalanceResolved = false;
                DistributedHerder.this.herderMetrics.rebalanceStarted(DistributedHerder.this.time.milliseconds());
            }
            if (DistributedHerder.this.isLeader()) {
                DistributedHerder.this.updateDeletedConnectorStatus();
            }
            DistributedHerder.this.member.wakeup();
        }

        @Override // org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener
        public void onAssigned(ConnectProtocol.Assignment assignment) {
        }

        @Override // org.apache.kafka.connect.runtime.distributed.WorkerRebalanceListener
        public void onRevoked(String str, Collection<String> collection, Collection<ConnectorTaskId> collection2) {
            DistributedHerder.log.info("Rebalance started");
            if (!DistributedHerder.this.rebalanceResolved) {
                DistributedHerder.log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
                return;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                arrayList.add(DistributedHerder.this.getConnectorStoppingCallable(it.next()));
            }
            Iterator<ConnectorTaskId> it2 = collection2.iterator();
            while (it2.hasNext()) {
                arrayList.add(DistributedHerder.this.getTaskStoppingCallable(it2.next()));
            }
            DistributedHerder.this.startAndStop(arrayList);
            DistributedHerder.this.statusBackingStore.flush();
            DistributedHerder.log.info("Finished stopping tasks in preparation for rebalance");
        }
    }

    public DistributedHerder(DistributedConfig distributedConfig, Time time, Worker worker, String str, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, String str2) {
        this(distributedConfig, worker, worker.workerId(), str, statusBackingStore, configBackingStore, null, str2, worker.metrics(), time);
        configBackingStore.setUpdateListener(new ConfigUpdateListener());
    }

    DistributedHerder(DistributedConfig distributedConfig, Worker worker, String str, String str2, StatusBackingStore statusBackingStore, ConfigBackingStore configBackingStore, WorkerGroupMember workerGroupMember, String str3, ConnectMetrics connectMetrics, Time time) {
        super(worker, str, str2, statusBackingStore, configBackingStore);
        this.requestSeqNum = new AtomicLong();
        this.requests = new ConcurrentSkipListSet();
        this.connectorConfigUpdates = new HashSet();
        this.connectorTargetStateChanges = new HashSet();
        this.time = time;
        this.herderMetrics = new HerderMetrics(connectMetrics);
        this.workerGroupId = distributedConfig.getString(DistributedConfig.GROUP_ID_CONFIG);
        this.workerSyncTimeoutMs = distributedConfig.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG).intValue();
        this.workerTasksShutdownTimeoutMs = distributedConfig.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG).longValue();
        this.workerUnsyncBackoffMs = distributedConfig.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG).intValue();
        this.member = workerGroupMember != null ? workerGroupMember : new WorkerGroupMember(distributedConfig, str3, this.configBackingStore, new RebalanceListener(), time);
        this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque(1), new ThreadFactory() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "DistributedHerder");
            }
        });
        this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
        this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
        this.config = distributedConfig;
        this.stopping = new AtomicBoolean(false);
        this.configState = ClusterConfigState.EMPTY;
        this.rebalanceResolved = true;
        this.needsReconfigRebalance = false;
        this.canReadConfigs = true;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void start() {
        this.herderExecutor.submit(this);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            log.info("Herder starting");
            startServices();
            log.info("Herder started");
            while (!this.stopping.get()) {
                tick();
            }
            halt();
            log.info("Herder stopped");
            this.herderMetrics.close();
        } catch (Throwable th) {
            log.error("Uncaught exception in herder work thread, exiting: ", th);
            Exit.exit(1);
        }
    }

    public void tick() {
        try {
            if (this.canReadConfigs || readConfigToEnd(this.workerSyncTimeoutMs)) {
                this.member.ensureActive();
                if (handleRebalanceCompleted()) {
                    long milliseconds = this.time.milliseconds();
                    long j = Long.MAX_VALUE;
                    while (true) {
                        DistributedHerderRequest peekWithoutException = peekWithoutException();
                        if (peekWithoutException == null) {
                            break;
                        }
                        if (milliseconds < peekWithoutException.at) {
                            j = peekWithoutException.at - milliseconds;
                            break;
                        }
                        this.requests.pollFirst();
                        try {
                            peekWithoutException.action().call();
                            peekWithoutException.callback().onCompletion(null, null);
                        } catch (Throwable th) {
                            peekWithoutException.callback().onCompletion(th, null);
                        }
                    }
                    Set<String> set = null;
                    Set<String> set2 = null;
                    synchronized (this) {
                        if (this.needsReconfigRebalance || !this.connectorConfigUpdates.isEmpty() || !this.connectorTargetStateChanges.isEmpty()) {
                            this.configState = this.configBackingStore.snapshot();
                            if (this.needsReconfigRebalance) {
                                this.member.requestRejoin();
                                this.connectorConfigUpdates.clear();
                                this.connectorTargetStateChanges.clear();
                                this.needsReconfigRebalance = false;
                                return;
                            }
                            if (!this.connectorConfigUpdates.isEmpty()) {
                                set = this.connectorConfigUpdates;
                                this.connectorConfigUpdates = new HashSet();
                            }
                            if (!this.connectorTargetStateChanges.isEmpty()) {
                                set2 = this.connectorTargetStateChanges;
                                this.connectorTargetStateChanges = new HashSet();
                            }
                        }
                        if (set != null) {
                            processConnectorConfigUpdates(set);
                        }
                        if (set2 != null) {
                            processTargetStateChanges(set2);
                        }
                        try {
                            this.member.poll(j);
                            handleRebalanceCompleted();
                        } catch (WakeupException e) {
                        }
                    }
                }
            }
        } catch (WakeupException e2) {
        }
    }

    private void processConnectorConfigUpdates(Set<String> set) {
        Set emptySet = this.assignment == null ? Collections.emptySet() : new HashSet(this.assignment.connectors());
        for (String str : set) {
            if (emptySet.contains(str)) {
                boolean contains = this.configState.contains(str);
                log.info("Handling connector-only config update by {} connector {}", contains ? "restarting" : "stopping", str);
                this.worker.stopConnector(str);
                if (contains) {
                    startConnector(str);
                }
            }
        }
    }

    private void processTargetStateChanges(Set<String> set) {
        for (String str : set) {
            TargetState targetState = this.configState.targetState(str);
            if (this.configState.connectors().contains(str)) {
                this.worker.setTargetState(str, targetState);
                if (targetState == TargetState.STARTED) {
                    reconfigureConnectorTasksWithRetry(str);
                }
            } else {
                log.debug("Received target state change for unknown connector: {}", str);
            }
        }
    }

    public void halt() {
        synchronized (this) {
            log.info("Stopping connectors and tasks that are still assigned to this worker.");
            ArrayList arrayList = new ArrayList();
            Iterator it = new ArrayList(this.worker.connectorNames()).iterator();
            while (it.hasNext()) {
                arrayList.add(getConnectorStoppingCallable((String) it.next()));
            }
            Iterator it2 = new ArrayList(this.worker.taskIds()).iterator();
            while (it2.hasNext()) {
                arrayList.add(getTaskStoppingCallable((ConnectorTaskId) it2.next()));
            }
            startAndStop(arrayList);
            this.member.stop();
            DistributedHerderRequest pollFirst = this.requests.pollFirst();
            while (pollFirst != null) {
                pollFirst.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
                pollFirst = this.requests.pollFirst();
            }
            stopServices();
        }
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void stop() {
        log.info("Herder stopping");
        this.stopping.set(true);
        this.member.wakeup();
        this.herderExecutor.shutdown();
        try {
            if (!this.herderExecutor.awaitTermination(this.workerTasksShutdownTimeoutMs, TimeUnit.MILLISECONDS)) {
                this.herderExecutor.shutdownNow();
            }
            this.forwardRequestExecutor.shutdown();
            this.startAndStopExecutor.shutdown();
            if (!this.forwardRequestExecutor.awaitTermination(FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                this.forwardRequestExecutor.shutdownNow();
            }
            if (!this.startAndStopExecutor.awaitTermination(START_AND_STOP_SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                this.startAndStopExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
        }
        log.info("Herder stopped");
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectors(final Callback<Collection<String>> callback) {
        log.trace("Submitting connector listing request");
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                callback.onCompletion(null, DistributedHerder.this.configState.connectors());
                return null;
            }
        }, forwardErrorCallback(callback));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectorInfo(final String str, final Callback<ConnectorInfo> callback) {
        log.trace("Submitting connector info request {}", str);
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.contains(str)) {
                    callback.onCompletion(new NotFoundException("Connector " + str + " not found"), null);
                    return null;
                }
                Map<String, String> rawConnectorConfig = DistributedHerder.this.configState.rawConnectorConfig(str);
                callback.onCompletion(null, new ConnectorInfo(str, rawConnectorConfig, DistributedHerder.this.configState.tasks(str), DistributedHerder.this.connectorTypeForClass(rawConnectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))));
                return null;
            }
        }, forwardErrorCallback(callback));
    }

    @Override // org.apache.kafka.connect.runtime.AbstractHerder
    protected Map<String, String> config(String str) {
        return this.configState.connectorConfig(str);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void connectorConfig(String str, final Callback<Map<String, String>> callback) {
        log.trace("Submitting connector config read request {}", str);
        connectorInfo(str, new Callback<ConnectorInfo>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.4
            @Override // org.apache.kafka.connect.util.Callback
            public void onCompletion(Throwable th, ConnectorInfo connectorInfo) {
                if (th != null) {
                    callback.onCompletion(th, null);
                } else {
                    callback.onCompletion(null, connectorInfo.config());
                }
            }
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void deleteConnectorConfig(final String str, final Callback<Herder.Created<ConnectorInfo>> callback) {
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.5
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                DistributedHerder.log.trace("Handling connector config request {}", str);
                if (!DistributedHerder.this.isLeader()) {
                    callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", DistributedHerder.this.leaderUrl()), null);
                    return null;
                }
                if (!DistributedHerder.this.configState.contains(str)) {
                    callback.onCompletion(new NotFoundException("Connector " + str + " not found"), null);
                    return null;
                }
                DistributedHerder.log.trace("Removing connector config {} {}", str, DistributedHerder.this.configState.connectors());
                DistributedHerder.this.configBackingStore.removeConnectorConfig(str);
                callback.onCompletion(null, new Herder.Created(false, null));
                return null;
            }
        }, forwardErrorCallback(callback));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.kafka.connect.runtime.AbstractHerder
    public Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector, ConfigDef configDef, Map<String, String> map) {
        Map<String, ConfigValue> validateBasicConnectorConfig = super.validateBasicConnectorConfig(connector, configDef, map);
        if (connector instanceof SinkConnector) {
            ConfigValue configValue = validateBasicConnectorConfig.get("name");
            String str = (String) configValue.value();
            if (this.workerGroupId.equals(SinkUtils.consumerGroupId(str))) {
                configValue.addErrorMessage("Consumer group for sink connector named " + str + " conflicts with Connect worker group " + this.workerGroupId);
            }
        }
        return validateBasicConnectorConfig;
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void putConnectorConfig(final String str, final Map<String, String> map, final boolean z, final Callback<Herder.Created<ConnectorInfo>> callback) {
        log.trace("Submitting connector config write request {}", str);
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.6
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (DistributedHerder.this.maybeAddConfigErrors(DistributedHerder.this.validateConnectorConfig(map), callback)) {
                    return null;
                }
                DistributedHerder.log.trace("Handling connector config request {}", str);
                if (!DistributedHerder.this.isLeader()) {
                    callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", DistributedHerder.this.leaderUrl()), null);
                    return null;
                }
                boolean contains = DistributedHerder.this.configState.contains(str);
                if (!z && contains) {
                    callback.onCompletion(new AlreadyExistsException("Connector " + str + " already exists"), null);
                    return null;
                }
                DistributedHerder.log.trace("Submitting connector config {} {} {}", new Object[]{str, Boolean.valueOf(z), DistributedHerder.this.configState.connectors()});
                DistributedHerder.this.configBackingStore.putConnectorConfig(str, map);
                callback.onCompletion(null, new Herder.Created(!contains, new ConnectorInfo(str, map, DistributedHerder.this.configState.tasks(str), DistributedHerder.this.connectorTypeForClass((String) map.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)))));
                return null;
            }
        }, forwardErrorCallback(callback));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void requestTaskReconfiguration(final String str) {
        log.trace("Submitting connector task reconfiguration request {}", str);
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.7
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                DistributedHerder.this.reconfigureConnectorTasksWithRetry(str);
                return null;
            }
        }, new Callback<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.8
            @Override // org.apache.kafka.connect.util.Callback
            public void onCompletion(Throwable th, Void r6) {
                if (th != null) {
                    DistributedHerder.log.error("Unexpected error during task reconfiguration: ", th);
                    DistributedHerder.log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", str);
                }
            }
        });
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void taskConfigs(final String str, final Callback<List<TaskInfo>> callback) {
        log.trace("Submitting get task configuration request {}", str);
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.9
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.contains(str)) {
                    callback.onCompletion(new NotFoundException("Connector " + str + " not found"), null);
                    return null;
                }
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < DistributedHerder.this.configState.taskCount(str); i++) {
                    ConnectorTaskId connectorTaskId = new ConnectorTaskId(str, i);
                    arrayList.add(new TaskInfo(connectorTaskId, DistributedHerder.this.configState.rawTaskConfig(connectorTaskId)));
                }
                callback.onCompletion(null, arrayList);
                return null;
            }
        }, forwardErrorCallback(callback));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void putTaskConfigs(final String str, final List<Map<String, String>> list, final Callback<Void> callback) {
        log.trace("Submitting put task configuration request {}", str);
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.10
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (!DistributedHerder.this.isLeader()) {
                    callback.onCompletion(new NotLeaderException("Only the leader may write task configurations.", DistributedHerder.this.leaderUrl()), null);
                    return null;
                }
                if (!DistributedHerder.this.configState.contains(str)) {
                    callback.onCompletion(new NotFoundException("Connector " + str + " not found"), null);
                    return null;
                }
                DistributedHerder.this.configBackingStore.putTaskConfigs(str, list);
                callback.onCompletion(null, null);
                return null;
            }
        }, forwardErrorCallback(callback));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void restartConnector(String str, Callback<Void> callback) {
        restartConnector(0L, str, callback);
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public HerderRequest restartConnector(long j, final String str, final Callback<Void> callback) {
        return addRequest(j, new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.11
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.connectors().contains(str)) {
                    callback.onCompletion(new NotFoundException("Unknown connector: " + str), null);
                    return null;
                }
                if (!DistributedHerder.this.assignment.connectors().contains(str)) {
                    if (DistributedHerder.this.isLeader()) {
                        callback.onCompletion(new NotAssignedException("Cannot restart connector since it is not assigned to this member", DistributedHerder.this.member.ownerUrl(str)), null);
                        return null;
                    }
                    callback.onCompletion(new NotLeaderException("Cannot restart connector since it is not assigned to this member", DistributedHerder.this.leaderUrl()), null);
                    return null;
                }
                try {
                    DistributedHerder.this.worker.stopConnector(str);
                    if (DistributedHerder.this.startConnector(str)) {
                        callback.onCompletion(null, null);
                    } else {
                        callback.onCompletion(new ConnectException("Failed to start connector: " + str), null);
                    }
                    return null;
                } catch (Throwable th) {
                    callback.onCompletion(th, null);
                    return null;
                }
            }
        }, forwardErrorCallback(callback));
    }

    @Override // org.apache.kafka.connect.runtime.Herder
    public void restartTask(final ConnectorTaskId connectorTaskId, final Callback<Void> callback) {
        addRequest(new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.12
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                if (DistributedHerder.this.checkRebalanceNeeded(callback)) {
                    return null;
                }
                if (!DistributedHerder.this.configState.connectors().contains(connectorTaskId.connector())) {
                    callback.onCompletion(new NotFoundException("Unknown connector: " + connectorTaskId.connector()), null);
                    return null;
                }
                if (DistributedHerder.this.configState.taskConfig(connectorTaskId) == null) {
                    callback.onCompletion(new NotFoundException("Unknown task: " + connectorTaskId), null);
                    return null;
                }
                if (!DistributedHerder.this.assignment.tasks().contains(connectorTaskId)) {
                    if (DistributedHerder.this.isLeader()) {
                        callback.onCompletion(new NotAssignedException("Cannot restart task since it is not assigned to this member", DistributedHerder.this.member.ownerUrl(connectorTaskId)), null);
                        return null;
                    }
                    callback.onCompletion(new NotLeaderException("Cannot restart task since it is not assigned to this member", DistributedHerder.this.leaderUrl()), null);
                    return null;
                }
                try {
                    DistributedHerder.this.worker.stopAndAwaitTask(connectorTaskId);
                    if (DistributedHerder.this.startTask(connectorTaskId)) {
                        callback.onCompletion(null, null);
                    } else {
                        callback.onCompletion(new ConnectException("Failed to start task: " + connectorTaskId), null);
                    }
                    return null;
                } catch (Throwable th) {
                    callback.onCompletion(th, null);
                    return null;
                }
            }
        }, forwardErrorCallback(callback));
    }

    @Override // org.apache.kafka.connect.runtime.AbstractHerder
    public int generation() {
        return this.generation;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isLeader() {
        return this.assignment != null && this.member.memberId().equals(this.assignment.leader());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String leaderUrl() {
        if (this.assignment == null) {
            return null;
        }
        return this.assignment.leaderUrl();
    }

    private boolean handleRebalanceCompleted() {
        if (this.rebalanceResolved) {
            return true;
        }
        boolean z = false;
        boolean z2 = false;
        if (this.assignment.failed()) {
            z2 = true;
            if (isLeader()) {
                log.warn("Join group completed, but assignment failed and we are the leader. Reading to end of config and retrying.");
                z = true;
            } else if (this.configState.offset() < this.assignment.offset()) {
                log.warn("Join group completed, but assignment failed and we lagging. Reading to end of config and retrying.");
                z = true;
            } else {
                log.warn("Join group completed, but assignment failed. We were up to date, so just retrying.");
            }
        } else if (this.configState.offset() < this.assignment.offset()) {
            log.warn("Catching up to assignment's config offset.");
            z = true;
        }
        if (z && !readConfigToEnd(this.workerSyncTimeoutMs)) {
            this.canReadConfigs = false;
            z2 = true;
        }
        if (z2) {
            this.member.requestRejoin();
            return false;
        }
        if (this.configState.offset() != this.assignment.offset()) {
            log.info("Current config state offset {} does not match group assignment {}. Forcing rebalance.", Long.valueOf(this.configState.offset()), Long.valueOf(this.assignment.offset()));
            this.member.requestRejoin();
            return false;
        }
        startWork();
        this.herderMetrics.rebalanceSucceeded(this.time.milliseconds());
        this.rebalanceResolved = true;
        return true;
    }

    private boolean readConfigToEnd(long j) {
        log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", Long.valueOf(this.configState.offset()), Long.valueOf(this.assignment.offset()));
        try {
            this.configBackingStore.refresh(j, TimeUnit.MILLISECONDS);
            this.configState = this.configBackingStore.snapshot();
            log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", Long.valueOf(this.configState.offset()));
            return true;
        } catch (TimeoutException e) {
            log.warn("Didn't reach end of config log quickly enough", e);
            this.member.maybeLeaveGroup();
            backoff(this.workerUnsyncBackoffMs);
            return false;
        }
    }

    private void backoff(long j) {
        Utils.sleep(j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void startAndStop(Collection<Callable<Void>> collection) {
        try {
            this.startAndStopExecutor.invokeAll(collection);
        } catch (InterruptedException e) {
        }
    }

    private void startWork() {
        log.info("Starting connectors and tasks using config offset {}", Long.valueOf(this.assignment.offset()));
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.assignment.connectors().iterator();
        while (it.hasNext()) {
            arrayList.add(getConnectorStartingCallable(it.next()));
        }
        Iterator<ConnectorTaskId> it2 = this.assignment.tasks().iterator();
        while (it2.hasNext()) {
            arrayList.add(getTaskStartingCallable(it2.next()));
        }
        startAndStop(arrayList);
        log.info("Finished starting connectors and tasks");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean startTask(ConnectorTaskId connectorTaskId) {
        log.info("Starting task {}", connectorTaskId);
        return this.worker.startTask(connectorTaskId, this.configState, this.configState.connectorConfig(connectorTaskId.connector()), this.configState.taskConfig(connectorTaskId), this, this.configState.targetState(connectorTaskId.connector()));
    }

    private Callable<Void> getTaskStartingCallable(final ConnectorTaskId connectorTaskId) {
        return new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.13
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                try {
                    DistributedHerder.this.startTask(connectorTaskId);
                    return null;
                } catch (Throwable th) {
                    DistributedHerder.log.error("Couldn't instantiate task {} because it has an invalid task configuration. This task will not execute until reconfigured.", connectorTaskId, th);
                    DistributedHerder.this.onFailure(connectorTaskId, th);
                    return null;
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Callable<Void> getTaskStoppingCallable(final ConnectorTaskId connectorTaskId) {
        return new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.14
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                DistributedHerder.this.worker.stopAndAwaitTask(connectorTaskId);
                return null;
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean startConnector(String str) {
        log.info("Starting connector {}", str);
        Map<String, String> connectorConfig = this.configState.connectorConfig(str);
        HerderConnectorContext herderConnectorContext = new HerderConnectorContext(this, str);
        TargetState targetState = this.configState.targetState(str);
        setTaskUserIfNull(connectorConfig);
        boolean startConnector = this.worker.startConnector(str, connectorConfig, herderConnectorContext, this, targetState);
        if (startConnector && targetState == TargetState.STARTED) {
            reconfigureConnectorTasksWithRetry(str);
        }
        return startConnector;
    }

    private void setTaskUserIfNull(Map<String, String> map) {
        if (map.containsKey(TaskConfig.TASK_USER_CONFIG) && map.get(TaskConfig.TASK_USER_CONFIG) == null) {
            try {
                map.put(TaskConfig.TASK_USER_CONFIG, UserGroupInformation.getCurrentUser().getShortUserName());
            } catch (IOException e) {
                log.error("Can not get the current user: " + e);
            }
        }
    }

    private Callable<Void> getConnectorStartingCallable(final String str) {
        return new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.15
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                try {
                    DistributedHerder.this.startConnector(str);
                    return null;
                } catch (Throwable th) {
                    DistributedHerder.log.error("Couldn't instantiate connector " + str + " because it has an invalid connector configuration. This connector will not execute until reconfigured.", th);
                    DistributedHerder.this.onFailure(str, th);
                    return null;
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Callable<Void> getConnectorStoppingCallable(final String str) {
        return new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.16
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                try {
                    DistributedHerder.this.worker.stopConnector(str);
                    return null;
                } catch (Throwable th) {
                    DistributedHerder.log.error("Failed to shut down connector " + str, th);
                    return null;
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconfigureConnectorTasksWithRetry(final String str) {
        reconfigureConnector(str, new Callback<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.17
            @Override // org.apache.kafka.connect.util.Callback
            public void onCompletion(Throwable th, Void r10) {
                if (th != null) {
                    DistributedHerder.log.error("Failed to reconfigure connector's tasks, retrying after backoff:", th);
                    DistributedHerder.this.addRequest(DistributedHerder.RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS, new Callable<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.17.1
                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.concurrent.Callable
                        public Void call() throws Exception {
                            DistributedHerder.this.reconfigureConnectorTasksWithRetry(str);
                            return null;
                        }
                    }, new Callback<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.17.2
                        @Override // org.apache.kafka.connect.util.Callback
                        public void onCompletion(Throwable th2, Void r6) {
                            DistributedHerder.log.error("Unexpected error during connector task reconfiguration: ", th2);
                            DistributedHerder.log.error("Task reconfiguration for {} failed unexpectedly, this connector will not be properly reconfigured unless manually triggered.", str);
                        }
                    });
                }
            }
        });
    }

    private void reconfigureConnector(final String str, final Callback<Void> callback) {
        try {
            if (!this.worker.isRunning(str)) {
                log.info("Skipping reconfiguration of connector {} since it is not running", str);
                return;
            }
            Map<String, String> connectorConfig = this.configState.connectorConfig(str);
            List<Map<String, String>> connectorTaskConfigs = this.worker.connectorTaskConfigs(str, this.worker.isSinkConnector(str) ? new SinkConnectorConfig(plugins(), connectorConfig) : new SourceConnectorConfig(plugins(), connectorConfig));
            boolean z = false;
            int taskCount = this.configState.taskCount(str);
            if (connectorTaskConfigs.size() != taskCount) {
                log.debug("Change in connector task count from {} to {}, writing updated task configurations", Integer.valueOf(taskCount), Integer.valueOf(connectorTaskConfigs.size()));
                z = true;
            } else {
                int i = 0;
                Iterator<Map<String, String>> it = connectorTaskConfigs.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (!it.next().equals(this.configState.taskConfig(new ConnectorTaskId(str, i)))) {
                        log.debug("Change in task configurations, writing updated task configurations");
                        z = true;
                        break;
                    }
                    i++;
                }
            }
            if (z) {
                final List<Map<String, String>> reverseTransform = reverseTransform(str, this.configState, connectorTaskConfigs);
                if (isLeader()) {
                    this.configBackingStore.putTaskConfigs(str, reverseTransform);
                    callback.onCompletion(null, null);
                } else {
                    this.forwardRequestExecutor.submit(new Runnable() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.18
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                RestClient.httpRequest(RestServer.urlJoin(DistributedHerder.this.leaderUrl(), "/connectors/" + str + "/tasks"), "POST", reverseTransform, null, DistributedHerder.this.config);
                                callback.onCompletion(null, null);
                            } catch (ConnectException e) {
                                DistributedHerder.log.error("Request to leader to reconfigure connector tasks failed", e);
                                callback.onCompletion(e, null);
                            }
                        }
                    });
                }
            }
        } catch (Throwable th) {
            callback.onCompletion(th, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkRebalanceNeeded(Callback<?> callback) {
        if (!this.needsReconfigRebalance) {
            return false;
        }
        callback.onCompletion(new RebalanceNeededException("Request cannot be completed because a rebalance is expected"), null);
        return true;
    }

    DistributedHerderRequest addRequest(Callable<Void> callable, Callback<Void> callback) {
        return addRequest(0L, callable, callback);
    }

    DistributedHerderRequest addRequest(long j, Callable<Void> callable, Callback<Void> callback) {
        DistributedHerderRequest distributedHerderRequest = new DistributedHerderRequest(this.time.milliseconds() + j, this.requestSeqNum.incrementAndGet(), callable, callback);
        this.requests.add(distributedHerderRequest);
        if (peekWithoutException() == distributedHerderRequest) {
            this.member.wakeup();
        }
        return distributedHerderRequest;
    }

    private DistributedHerderRequest peekWithoutException() {
        try {
            if (this.requests.isEmpty()) {
                return null;
            }
            return this.requests.first();
        } catch (NoSuchElementException e) {
            return null;
        }
    }

    private static final Callback<Void> forwardErrorCallback(final Callback<?> callback) {
        return new Callback<Void>() { // from class: org.apache.kafka.connect.runtime.distributed.DistributedHerder.19
            @Override // org.apache.kafka.connect.util.Callback
            public void onCompletion(Throwable th, Void r6) {
                if (th != null) {
                    Callback.this.onCompletion(th, null);
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateDeletedConnectorStatus() {
        Set<String> connectors = this.configBackingStore.snapshot().connectors();
        for (String str : this.statusBackingStore.connectors()) {
            if (!connectors.contains(str)) {
                log.debug("Cleaning status information for connector {}", str);
                onDeletion(str);
            }
        }
    }

    protected HerderMetrics herderMetrics() {
        return this.herderMetrics;
    }
}
