package org.apache.nifi.controller.queue.clustered.client.async.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
import org.apache.nifi.controller.queue.clustered.SimpleLimitThreshold;
import org.apache.nifi.controller.queue.clustered.TransactionThreshold;
import org.apache.nifi.controller.queue.clustered.client.LoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
import org.apache.nifi.controller.queue.clustered.client.async.nio.LoadBalanceSession;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/client/async/nio/NioAsyncLoadBalanceClient.class */
public class NioAsyncLoadBalanceClient implements AsyncLoadBalanceClient {
    private static final Logger logger = LoggerFactory.getLogger(NioAsyncLoadBalanceClient.class);
    private static final long PENALIZATION_MILLIS = TimeUnit.SECONDS.toMillis(1);
    private final NodeIdentifier nodeIdentifier;
    private final SSLContext sslContext;
    private final int timeoutMillis;
    private final FlowFileContentAccess flowFileContentAccess;
    private final LoadBalanceFlowFileCodec flowFileCodec;
    private final EventReporter eventReporter;
    private final ClusterCoordinator clusterCoordinator;
    private PeerChannel channel;
    private Selector selector;
    private SelectionKey selectionKey;
    private volatile boolean running = false;
    private final AtomicLong penalizationEnd = new AtomicLong(0);
    private final Map<String, RegisteredPartition> registeredPartitions = new HashMap();
    private final Queue<RegisteredPartition> partitionQueue = new LinkedBlockingQueue();
    private final Lock loadBalanceSessionLock = new ReentrantLock();
    private LoadBalanceSession loadBalanceSession = null;

    public NioAsyncLoadBalanceClient(NodeIdentifier nodeIdentifier, SSLContext sSLContext, int i, FlowFileContentAccess flowFileContentAccess, LoadBalanceFlowFileCodec loadBalanceFlowFileCodec, EventReporter eventReporter, ClusterCoordinator clusterCoordinator) {
        this.nodeIdentifier = nodeIdentifier;
        this.sslContext = sSLContext;
        this.timeoutMillis = i;
        this.flowFileContentAccess = flowFileContentAccess;
        this.flowFileCodec = loadBalanceFlowFileCodec;
        this.eventReporter = eventReporter;
        this.clusterCoordinator = clusterCoordinator;
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public NodeIdentifier getNodeIdentifier() {
        return this.nodeIdentifier;
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public synchronized void register(String str, BooleanSupplier booleanSupplier, Supplier<FlowFileRecord> supplier, TransactionFailureCallback transactionFailureCallback, TransactionCompleteCallback transactionCompleteCallback, Supplier<LoadBalanceCompression> supplier2, BooleanSupplier booleanSupplier2) {
        if (this.registeredPartitions.containsKey(str)) {
            throw new IllegalStateException("Connection with ID " + str + " is already registered");
        }
        RegisteredPartition registeredPartition = new RegisteredPartition(str, booleanSupplier, supplier, transactionFailureCallback, transactionCompleteCallback, supplier2, booleanSupplier2);
        this.registeredPartitions.put(str, registeredPartition);
        this.partitionQueue.add(registeredPartition);
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public synchronized void unregister(String str) {
        RegisteredPartition remove = this.registeredPartitions.remove(str);
        if (remove == null) {
            logger.debug("{} Unregistered Connection with ID {} but there were no Registered Partitions", this, str);
            return;
        }
        logger.debug("{} Unregistered Connection with ID {}. Will fail any in-flight FlowFiles for Registered Partition {}", new Object[]{this, str, remove});
        if ((this.loadBalanceSession != null && str.equals(this.loadBalanceSession.getPartition().getConnectionId())) && !this.loadBalanceSession.getSessionState().isComplete() && this.loadBalanceSession.cancel()) {
            List<FlowFileRecord> andPurgeFlowFilesSent = this.loadBalanceSession.getAndPurgeFlowFilesSent();
            logger.debug("{} Triggering failure callback for {} FlowFiles for Registered Partition {} because partition was unregistered", new Object[]{this, Integer.valueOf(andPurgeFlowFilesSent.size()), remove});
            remove.getFailureCallback().onTransactionFailed(andPurgeFlowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
        }
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public synchronized int getRegisteredConnectionCount() {
        return this.registeredPartitions.size();
    }

    private synchronized Map<String, RegisteredPartition> getRegisteredPartitions() {
        return new HashMap(this.registeredPartitions);
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public void start() {
        this.running = true;
        logger.debug("{} started", this);
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public void stop() {
        this.running = false;
        logger.debug("{} stopped", this);
        close();
    }

    private synchronized void close() {
        if (this.selector != null && this.selector.isOpen()) {
            try {
                this.selector.close();
            } catch (Exception e) {
                logger.warn("Failed to close NIO Selector", e);
            }
        }
        if (this.channel != null && this.channel.isOpen()) {
            try {
                this.channel.close();
            } catch (Exception e2) {
                logger.warn("Failed to close Socket Channel to {} for Load Balancing", this.nodeIdentifier, e2);
            }
        }
        this.channel = null;
        this.selector = null;
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public boolean isPenalized() {
        long j = this.penalizationEnd.get();
        if (j == 0) {
            return false;
        }
        if (j >= System.currentTimeMillis()) {
            return true;
        }
        this.penalizationEnd.compareAndSet(j, 0L);
        return false;
    }

    private void penalize() {
        logger.debug("Penalizing {}", this);
        this.penalizationEnd.set(System.currentTimeMillis() + PENALIZATION_MILLIS);
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public boolean communicate() throws IOException {
        boolean communicate;
        if (!this.running) {
            return false;
        }
        try {
            if (!this.loadBalanceSessionLock.tryLock()) {
                return false;
            }
            try {
                RegisteredPartition registeredPartition = null;
                if (!isConnectionEstablished()) {
                    registeredPartition = getReadyPartition();
                    if (registeredPartition == null) {
                        logger.debug("{} has no connection with data ready to be transmitted so will penalize Client without communicating", this);
                        penalize();
                        this.loadBalanceSessionLock.unlock();
                        return false;
                    }
                    try {
                        establishConnection();
                    } catch (IOException e) {
                        penalize();
                        this.partitionQueue.offer(registeredPartition);
                        for (RegisteredPartition registeredPartition2 : getRegisteredPartitions().values()) {
                            logger.debug("Triggering Transaction Failure Callback for {} with Transaction Phase of CONNECTING", registeredPartition2);
                            registeredPartition2.getFailureCallback().onTransactionFailed(Collections.emptyList(), e, TransactionFailureCallback.TransactionPhase.CONNECTING);
                        }
                        return false;
                    }
                }
                LoadBalanceSession activeTransaction = getActiveTransaction(registeredPartition);
                if (activeTransaction == null) {
                    penalize();
                    this.loadBalanceSessionLock.unlock();
                    return false;
                }
                this.selector.selectNow();
                if (!((activeTransaction.getDesiredReadinessFlag() & this.selectionKey.readyOps()) != 0)) {
                    this.loadBalanceSessionLock.unlock();
                    return false;
                }
                boolean z = false;
                do {
                    try {
                        communicate = activeTransaction.communicate();
                        z = z || communicate;
                    } catch (Exception e2) {
                        logger.error("Failed to communicate with Peer {}", this.nodeIdentifier.toString(), e2);
                        this.eventReporter.reportEvent(Severity.ERROR, "Load Balanced Connection", "Failed to communicate with Peer " + this.nodeIdentifier + " when load balancing data for Connection with ID " + activeTransaction.getPartition().getConnectionId() + " due to " + e2);
                        penalize();
                        activeTransaction.getPartition().getFailureCallback().onTransactionFailed(activeTransaction.getAndPurgeFlowFilesSent(), e2, TransactionFailureCallback.TransactionPhase.SENDING);
                        close();
                        this.loadBalanceSessionLock.unlock();
                        return false;
                    }
                } while (communicate);
                LoadBalanceSession.LoadBalanceSessionState sessionState = activeTransaction.getSessionState();
                if (sessionState.isComplete() && sessionState != LoadBalanceSession.LoadBalanceSessionState.CANCELED) {
                    activeTransaction.getPartition().getSuccessCallback().onTransactionComplete(activeTransaction.getAndPurgeFlowFilesSent(), this.nodeIdentifier);
                }
                this.loadBalanceSessionLock.unlock();
                return z;
            } catch (Exception e3) {
                close();
                this.loadBalanceSession = null;
                throw e3;
            }
        } finally {
            this.loadBalanceSessionLock.unlock();
        }
    }

    @Override // org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClient
    public void nodeDisconnected() {
        if (this.loadBalanceSessionLock.tryLock()) {
            try {
                LoadBalanceSession failoverSession = getFailoverSession();
                if (failoverSession == null) {
                    RegisteredPartition readyPartition = getReadyPartition(false, registeredPartition -> {
                        return registeredPartition.getFailureCallback().isRebalanceOnFailure();
                    });
                    if (readyPartition == null) {
                        return;
                    }
                    this.partitionQueue.offer(readyPartition);
                    failFlowFiles(readyPartition);
                    penalize();
                    return;
                }
                this.loadBalanceSession = null;
                logger.debug("Node {} disconnected so will terminate the Load Balancing Session", this.nodeIdentifier);
                List<FlowFileRecord> andPurgeFlowFilesSent = failoverSession.getAndPurgeFlowFilesSent();
                if (!andPurgeFlowFilesSent.isEmpty()) {
                    failoverSession.getPartition().getFailureCallback().onTransactionFailed(andPurgeFlowFilesSent, TransactionFailureCallback.TransactionPhase.SENDING);
                }
                close();
                penalize();
            } finally {
                this.loadBalanceSessionLock.unlock();
            }
        }
    }

    private void failFlowFiles(RegisteredPartition registeredPartition) {
        FlowFileRecord flowFileRecord;
        TransactionThreshold newTransactionThreshold = newTransactionThreshold();
        ArrayList arrayList = new ArrayList();
        while (!newTransactionThreshold.isThresholdMet() && (flowFileRecord = registeredPartition.getFlowFileRecordSupplier().get()) != null) {
            arrayList.add(flowFileRecord);
            newTransactionThreshold.adjust(1, flowFileRecord.getSize());
        }
        logger.debug("Node {} not connected so failing {} FlowFiles for Load Balancing", this.nodeIdentifier, Integer.valueOf(arrayList.size()));
        registeredPartition.getFailureCallback().onTransactionFailed(arrayList, TransactionFailureCallback.TransactionPhase.SENDING);
    }

    private synchronized LoadBalanceSession getFailoverSession() {
        if (this.loadBalanceSession == null || this.loadBalanceSession.getSessionState().isComplete()) {
            return null;
        }
        return this.loadBalanceSession;
    }

    private RegisteredPartition getReadyPartition() {
        return getReadyPartition(true, registeredPartition -> {
            return true;
        });
    }

    private synchronized RegisteredPartition getReadyPartition(boolean z, Predicate<RegisteredPartition> predicate) {
        ArrayList arrayList = new ArrayList();
        while (true) {
            try {
                RegisteredPartition poll = this.partitionQueue.poll();
                if (poll == null) {
                    this.partitionQueue.addAll(arrayList);
                    return null;
                }
                if (!poll.isEmpty() && !poll.isPenalized() && ((!z || checkNodeConnected(poll)) && predicate.test(poll))) {
                    return poll;
                }
                arrayList.add(poll);
            } finally {
                this.partitionQueue.addAll(arrayList);
            }
        }
    }

    private synchronized boolean checkNodeConnected(RegisteredPartition registeredPartition) {
        NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(this.nodeIdentifier);
        boolean z = connectionStatus != null && connectionStatus.getState() == NodeConnectionState.CONNECTED;
        if (!z) {
            failFlowFiles(registeredPartition);
        }
        return z;
    }

    private synchronized LoadBalanceSession getActiveTransaction(RegisteredPartition registeredPartition) {
        if (this.loadBalanceSession != null && !this.loadBalanceSession.getSessionState().isComplete()) {
            return this.loadBalanceSession;
        }
        RegisteredPartition readyPartition = registeredPartition == null ? getReadyPartition() : registeredPartition;
        if (readyPartition == null) {
            return null;
        }
        this.loadBalanceSession = new LoadBalanceSession(readyPartition, this.flowFileContentAccess, this.flowFileCodec, this.channel, this.timeoutMillis, newTransactionThreshold());
        this.partitionQueue.offer(readyPartition);
        return this.loadBalanceSession;
    }

    private TransactionThreshold newTransactionThreshold() {
        return new SimpleLimitThreshold(1000, 10000000L);
    }

    private synchronized boolean isConnectionEstablished() {
        return (this.selector == null || this.channel == null || !this.channel.isConnected()) ? false : true;
    }

    private void establishConnection() throws IOException {
        SocketChannel socketChannel = null;
        try {
            synchronized (this) {
                if (isConnectionEstablished()) {
                    return;
                }
                this.selector = Selector.open();
                socketChannel = createChannel();
                socketChannel.configureBlocking(true);
                PeerChannel createPeerChannel = createPeerChannel(socketChannel, socketChannel.getLocalAddress() + "::" + socketChannel.getRemoteAddress());
                this.channel = createPeerChannel;
                createPeerChannel.performHandshake();
                synchronized (this) {
                    socketChannel.configureBlocking(false);
                    this.selectionKey = socketChannel.register(this.selector, 5);
                }
            }
        } catch (Exception e) {
            logger.error("Unable to connect to {} for load balancing", this.nodeIdentifier, e);
            close();
            if (socketChannel != null) {
                try {
                    socketChannel.close();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
            }
            throw e;
        }
    }

    private PeerChannel createPeerChannel(SocketChannel socketChannel, String str) {
        if (this.sslContext == null) {
            logger.debug("No SSL Context is available so will not perform SSL Handshake with Peer {}", str);
            return new PeerChannel(socketChannel, null, str);
        }
        logger.debug("Performing SSL Handshake with Peer {}", str);
        SSLEngine createSSLEngine = this.sslContext.createSSLEngine();
        createSSLEngine.setUseClientMode(true);
        createSSLEngine.setNeedClientAuth(true);
        return new PeerChannel(socketChannel, createSSLEngine, str);
    }

    private SocketChannel createChannel() throws IOException {
        SocketChannel open = SocketChannel.open();
        try {
            open.configureBlocking(true);
            Socket socket = open.socket();
            socket.setSoTimeout(this.timeoutMillis);
            socket.connect(new InetSocketAddress(this.nodeIdentifier.getLoadBalanceAddress(), this.nodeIdentifier.getLoadBalancePort()));
            socket.setSoTimeout(this.timeoutMillis);
            return open;
        } catch (Exception e) {
            try {
                open.close();
            } catch (Exception e2) {
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    public String toString() {
        return "NioAsyncLoadBalanceClient[nodeId=" + this.nodeIdentifier + "]";
    }
}
