package org.apache.nifi.controller.queue.clustered.server;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocket;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.nifi.security.util.TlsConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.class */
public class ConnectionLoadBalanceServer {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
    private static final AtomicLong threadCounter = new AtomicLong(1);
    private final String hostname;
    private final int port;
    private final SSLContext sslContext;
    private final LoadBalanceProtocol loadBalanceProtocol;
    private final int connectionTimeoutMillis;
    private final EventReporter eventReporter;
    private volatile AcceptConnection acceptConnection;
    private volatile ServerSocket serverSocket;
    private final List<CommunicateAction> communicationActions = Collections.synchronizedList(new ArrayList());
    private volatile boolean stopped = true;

    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer$AcceptConnection.class */
    private class AcceptConnection implements Runnable {
        private final ServerSocket serverSocket;
        private volatile boolean stopped = false;

        public AcceptConnection(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        public void stop() {
            this.stopped = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.serverSocket.setSoTimeout(1000);
            } catch (Exception e) {
                ConnectionLoadBalanceServer.logger.error("Failed to set soTimeout on Server Socket for Load Balancing data across cluster", e);
            }
            while (!this.stopped) {
                try {
                    try {
                        Socket accept = this.serverSocket.accept();
                        accept.setSoTimeout(ConnectionLoadBalanceServer.this.connectionTimeoutMillis);
                        CommunicateAction communicateAction = new CommunicateAction(ConnectionLoadBalanceServer.this.loadBalanceProtocol, accept, ConnectionLoadBalanceServer.this.eventReporter);
                        Thread thread = new Thread(communicateAction);
                        thread.setName("Load-Balance Server Thread-" + ConnectionLoadBalanceServer.threadCounter.getAndIncrement());
                        thread.start();
                        ConnectionLoadBalanceServer.this.communicationActions.add(communicateAction);
                    } catch (Exception e2) {
                        ConnectionLoadBalanceServer.logger.error("{} Failed to accept connection from other node in cluster", ConnectionLoadBalanceServer.this, e2);
                    }
                } catch (SocketTimeoutException e3) {
                }
            }
            try {
                this.serverSocket.close();
            } catch (Exception e4) {
                ConnectionLoadBalanceServer.logger.warn("Failed to properly shutdown Server Socket for Load Balancing", e4);
            }
        }
    }

    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer$CommunicateAction.class */
    protected static class CommunicateAction implements Runnable {
        private final LoadBalanceProtocol loadBalanceProtocol;
        private final Socket socket;
        private final InputStream in;
        private final OutputStream out;
        private final EventReporter eventReporter;
        private static int EXCEPTION_THRESHOLD_MILLIS = 10000;
        private volatile boolean stopped = false;
        private volatile long tlsErrorLastSeen = -1;

        public CommunicateAction(LoadBalanceProtocol loadBalanceProtocol, Socket socket, EventReporter eventReporter) throws IOException {
            this.loadBalanceProtocol = loadBalanceProtocol;
            this.socket = socket;
            this.eventReporter = eventReporter;
            this.in = new BufferedInputStream(socket.getInputStream());
            this.out = new BufferedOutputStream(socket.getOutputStream());
        }

        public void stop() {
            this.stopped = true;
            IOUtils.closeQuietly(this.socket);
        }

        @Override // java.lang.Runnable
        public void run() {
            String str = "<Unknown Channel>";
            while (!this.stopped) {
                try {
                    str = this.socket.getLocalSocketAddress() + "::" + this.socket.getRemoteSocketAddress();
                    ConnectionLoadBalanceServer.logger.debug("Receiving FlowFiles from Channel {}", str);
                    this.loadBalanceProtocol.receiveFlowFiles(this.socket, this.in, this.out);
                } catch (Exception e) {
                    this.stopped = true;
                    if (this.socket != null) {
                        try {
                            this.socket.close();
                        } catch (IOException e2) {
                            e.addSuppressed(e2);
                        }
                    }
                    if (CertificateUtils.isTlsError(e)) {
                        handleTlsError(str, e);
                    } else {
                        ConnectionLoadBalanceServer.logger.error("Failed to communicate over Channel {}", str, e);
                        this.eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to receive FlowFiles for Load Balancing due to " + e);
                    }
                }
                if (this.socket.isClosed()) {
                    ConnectionLoadBalanceServer.logger.debug("Finished Receiving FlowFiles from Channel {}", str);
                    return;
                }
                continue;
            }
        }

        private boolean handleTlsError(String str, Throwable th) {
            String str2 = "Failed to communicate over Channel " + str + " due to " + th.getLocalizedMessage();
            if (tlsErrorRecentlySeen()) {
                ConnectionLoadBalanceServer.logger.debug(str2);
                return false;
            }
            ConnectionLoadBalanceServer.logger.error(str2);
            ConnectionLoadBalanceServer.logger.info("\tPrinted above error because it has been {} ms since the last printing", Long.valueOf(System.currentTimeMillis() - this.tlsErrorLastSeen));
            this.eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", str2);
            this.tlsErrorLastSeen = System.currentTimeMillis();
            return true;
        }

        private boolean tlsErrorRecentlySeen() {
            return System.currentTimeMillis() - this.tlsErrorLastSeen < ((long) EXCEPTION_THRESHOLD_MILLIS);
        }
    }

    public ConnectionLoadBalanceServer(String str, int i, SSLContext sSLContext, int i2, LoadBalanceProtocol loadBalanceProtocol, EventReporter eventReporter, int i3) {
        this.hostname = str;
        this.port = i;
        this.sslContext = sSLContext;
        this.loadBalanceProtocol = loadBalanceProtocol;
        this.connectionTimeoutMillis = i3;
        this.eventReporter = eventReporter;
    }

    public void start() throws IOException {
        if (this.stopped) {
            this.stopped = false;
            if (this.serverSocket != null) {
                return;
            }
            try {
                this.serverSocket = createServerSocket();
                this.acceptConnection = new AcceptConnection(this.serverSocket);
                Thread thread = new Thread(this.acceptConnection);
                thread.setName("Receive Queue Load-Balancing Connections");
                thread.start();
            } catch (Exception e) {
                throw new IOException("Could not begin listening for incoming connections in order to load balance data across the cluster. Please verify the values of the 'nifi.cluster.load.balance.port' and 'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' properties", e);
            }
        }
    }

    public int getPort() {
        return this.serverSocket.getLocalPort();
    }

    public void stop() {
        this.stopped = true;
        if (this.acceptConnection != null) {
            this.acceptConnection.stop();
        }
        Iterator<CommunicateAction> it = this.communicationActions.iterator();
        while (it.hasNext()) {
            it.next().stop();
            it.remove();
        }
    }

    private ServerSocket createServerSocket() throws IOException {
        InetAddress byName = this.hostname == null ? null : InetAddress.getByName(this.hostname);
        if (this.sslContext == null) {
            return new ServerSocket(this.port, 50, InetAddress.getByName(this.hostname));
        }
        SSLServerSocket sSLServerSocket = (SSLServerSocket) this.sslContext.getServerSocketFactory().createServerSocket(this.port, 50, byName);
        sSLServerSocket.setNeedClientAuth(true);
        sSLServerSocket.setEnabledProtocols(TlsConfiguration.getCurrentSupportedTlsProtocolVersions());
        return sSLServerSocket;
    }

    public String toString() {
        return "ConnectionLoadBalanceServer[hostname=" + this.hostname + ", port=" + this.port + ", secure=" + (this.sslContext != null) + "]";
    }
}
