package org.apache.nifi.controller.queue.clustered.client.async.nio;

import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClientTask.class */
public class NioAsyncLoadBalanceClientTask implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClientTask.class);
    private static final String EVENT_CATEGORY = "Load-Balanced Connection";
    private final NioAsyncLoadBalanceClientRegistry clientRegistry;
    private final ClusterCoordinator clusterCoordinator;
    private final EventReporter eventReporter;
    private final Map<NodeIdentifier, NodeConnectionState> nodeConnectionStates = new HashMap();
    private volatile boolean running = true;

    public NioAsyncLoadBalanceClientTask(NioAsyncLoadBalanceClientRegistry nioAsyncLoadBalanceClientRegistry, ClusterCoordinator clusterCoordinator, EventReporter eventReporter) {
        this.clientRegistry = nioAsyncLoadBalanceClientRegistry;
        this.clusterCoordinator = clusterCoordinator;
        this.eventReporter = eventReporter;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.running) {
            try {
                boolean z = false;
                for (AsyncLoadBalanceClient asyncLoadBalanceClient : this.clientRegistry.getAllClients()) {
                    if (!asyncLoadBalanceClient.isRunning()) {
                        logger.trace("Client {} is not running so will not communicate with it", asyncLoadBalanceClient);
                    } else if (asyncLoadBalanceClient.isPenalized()) {
                        logger.trace("Client {} is penalized so will not communicate with it", asyncLoadBalanceClient);
                    } else {
                        NodeIdentifier nodeIdentifier = asyncLoadBalanceClient.getNodeIdentifier();
                        NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(nodeIdentifier);
                        if (connectionStatus == null) {
                            logger.debug("Could not determine Connection Status for Node with ID {}; will not communicate with it", nodeIdentifier);
                        } else {
                            NodeConnectionState state = connectionStatus.getState();
                            NodeConnectionState put = this.nodeConnectionStates.put(asyncLoadBalanceClient.getNodeIdentifier(), state);
                            if (state == NodeConnectionState.CONNECTED || put != NodeConnectionState.CONNECTED) {
                                while (asyncLoadBalanceClient.communicate()) {
                                    try {
                                        z = true;
                                        logger.trace("Client {} was able to make progress communicating with peer. Will continue to communicate with peer.", asyncLoadBalanceClient);
                                    } catch (Exception e) {
                                        this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to communicate with Peer " + asyncLoadBalanceClient.getNodeIdentifier() + " while trying to load balance data across the cluster due to " + e.toString());
                                        logger.error("Failed to communicate with Peer {} while trying to load balance data across the cluster.", asyncLoadBalanceClient.getNodeIdentifier(), e);
                                    }
                                }
                                logger.trace("Client {} was no longer able to make progress communicating with peer. Will move on to the next client", asyncLoadBalanceClient);
                            } else {
                                logger.debug("Notifying Client {} that node is not connected because current state is {}", asyncLoadBalanceClient, state);
                                asyncLoadBalanceClient.nodeDisconnected();
                            }
                        }
                    }
                }
                if (!z) {
                    logger.trace("Was unable to communicate with any client. Will sleep for 10 milliseconds.");
                    Thread.sleep(10L);
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            } catch (Exception e3) {
                logger.error("Failed to communicate with peer while trying to load balance data across the cluster", e3);
                this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to comunicate with Peer while trying to load balance data across the cluster due to " + e3);
            }
        }
    }

    public void stop() {
        this.running = false;
    }
}
