package org.apache.nifi.cluster.protocol;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Objects;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
import org.apache.nifi.cluster.protocol.message.CommsTimingDetails;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.class */
public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
    private static final Logger logger = LoggerFactory.getLogger(AbstractNodeProtocolSender.class);
    private final SocketConfiguration socketConfiguration;
    private final ProtocolContext<ProtocolMessage> protocolContext;
    private final ProtocolMessageMarshaller<ProtocolMessage> marshaller;
    private final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;

    public AbstractNodeProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
        this.marshaller = protocolContext.createMarshaller();
        this.unmarshaller = protocolContext.createUnmarshaller();
    }

    @Override // org.apache.nifi.cluster.protocol.NodeProtocolSender
    public ConnectionResponseMessage requestConnection(ConnectionRequestMessage connectionRequestMessage, boolean z) throws ProtocolException, UnknownServiceAddressException {
        Socket socket = null;
        try {
            try {
                InetSocketAddress serviceAddress = getServiceAddress();
                if (!z) {
                    validateNotConnectingToSelf(connectionRequestMessage, serviceAddress);
                }
                logger.info("Cluster Coordinator is located at {}. Will send Cluster Connection Request to this address", serviceAddress);
                socket = createSocket(serviceAddress);
                try {
                    this.protocolContext.createMarshaller().marshal(connectionRequestMessage, socket.getOutputStream());
                    try {
                        ProtocolMessage unmarshal = this.protocolContext.createUnmarshaller().unmarshal(socket.getInputStream());
                        if (ProtocolMessage.MessageType.CONNECTION_RESPONSE != unmarshal.getType()) {
                            throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.CONNECTION_RESPONSE + "' but found '" + unmarshal.getType() + "'");
                        }
                        ConnectionResponseMessage connectionResponseMessage = (ConnectionResponseMessage) unmarshal;
                        SocketUtils.closeQuietly(socket);
                        return connectionResponseMessage;
                    } catch (IOException e) {
                        throw new ProtocolException("Failed unmarshalling '" + ProtocolMessage.MessageType.CONNECTION_RESPONSE + "' protocol message from " + socket.getRemoteSocketAddress() + " due to: " + e, e);
                    }
                } catch (IOException e2) {
                    throw new ProtocolException("Failed marshalling '" + connectionRequestMessage.getType() + "' protocol message due to: " + e2, e2);
                }
            } catch (IOException e3) {
                throw new ProtocolException("Could not determined address of Cluster Coordinator", e3);
            }
        } catch (Throwable th) {
            SocketUtils.closeQuietly(socket);
            throw th;
        }
    }

    private void validateNotConnectingToSelf(ConnectionRequestMessage connectionRequestMessage, InetSocketAddress inetSocketAddress) {
        NodeIdentifier proposedNodeIdentifier = connectionRequestMessage.getConnectionRequest().getProposedNodeIdentifier();
        if (proposedNodeIdentifier == null) {
            return;
        }
        String socketAddress = proposedNodeIdentifier.getSocketAddress();
        int socketPort = proposedNodeIdentifier.getSocketPort();
        if (Objects.equals(socketAddress, inetSocketAddress.getHostString()) && socketPort == inetSocketAddress.getPort()) {
            throw new UnknownServiceAddressException("Cluster Coordinator is currently " + inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort() + ", which is this node, but connecting to self is not allowed at this phase of the lifecycle. This node must wait for a new Cluster Coordinator to be elected before connecting to the cluster.");
        }
    }

    @Override // org.apache.nifi.cluster.protocol.NodeProtocolSender
    public HeartbeatResponseMessage heartbeat(HeartbeatMessage heartbeatMessage, String str) throws ProtocolException {
        CommsTimingDetails commsTimingDetails = new CommsTimingDetails();
        String[] split = str.split(":");
        ProtocolMessage sendProtocolMessage = sendProtocolMessage(heartbeatMessage, split[0], Integer.parseInt(split[1]), commsTimingDetails);
        if (ProtocolMessage.MessageType.HEARTBEAT_RESPONSE != sendProtocolMessage.getType()) {
            throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.HEARTBEAT_RESPONSE + "' but found '" + sendProtocolMessage.getType() + "'");
        }
        HeartbeatResponseMessage heartbeatResponseMessage = (HeartbeatResponseMessage) sendProtocolMessage;
        heartbeatResponseMessage.setCommsTimingDetails(commsTimingDetails);
        return heartbeatResponseMessage;
    }

    @Override // org.apache.nifi.cluster.protocol.NodeProtocolSender
    public ClusterWorkloadResponseMessage clusterWorkload(ClusterWorkloadRequestMessage clusterWorkloadRequestMessage) throws ProtocolException {
        try {
            InetSocketAddress serviceAddress = getServiceAddress();
            ProtocolMessage sendProtocolMessage = sendProtocolMessage(clusterWorkloadRequestMessage, serviceAddress.getHostName(), serviceAddress.getPort(), new CommsTimingDetails());
            if (ProtocolMessage.MessageType.CLUSTER_WORKLOAD_RESPONSE == sendProtocolMessage.getType()) {
                return (ClusterWorkloadResponseMessage) sendProtocolMessage;
            }
            throw new ProtocolException("Expected message type '" + ProtocolMessage.MessageType.CLUSTER_WORKLOAD_RESPONSE + "' but found '" + sendProtocolMessage.getType() + "'");
        } catch (IOException e) {
            throw new ProtocolException("Failed to getServiceAddress due to " + e, e);
        }
    }

    private Socket createSocket(InetSocketAddress inetSocketAddress) {
        try {
            return SocketUtils.createSocket(inetSocketAddress, this.socketConfiguration);
        } catch (IOException e) {
            if (inetSocketAddress == null) {
                throw new ProtocolException("Failed to create socket due to: " + e, e);
            }
            throw new ProtocolException("Failed to create socket to " + inetSocketAddress + " due to: " + e, e);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    private ProtocolMessage sendProtocolMessage(ProtocolMessage protocolMessage, String str, int i, CommsTimingDetails commsTimingDetails) {
        long currentTimeMillis = System.currentTimeMillis();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(str, i);
        long currentTimeMillis2 = System.currentTimeMillis();
        try {
            Socket createSocket = SocketUtils.createSocket(inetSocketAddress, this.socketConfiguration);
            try {
                BufferedInputStream bufferedInputStream = new BufferedInputStream(createSocket.getInputStream());
                try {
                    BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(createSocket.getOutputStream());
                    try {
                        long currentTimeMillis3 = System.currentTimeMillis();
                        try {
                            this.marshaller.marshal(protocolMessage, bufferedOutputStream);
                            long currentTimeMillis4 = System.currentTimeMillis();
                            bufferedInputStream.mark(1);
                            bufferedInputStream.read();
                            bufferedInputStream.reset();
                            long currentTimeMillis5 = System.currentTimeMillis();
                            try {
                                ProtocolMessage unmarshal = this.unmarshaller.unmarshal(bufferedInputStream);
                                long currentTimeMillis6 = System.currentTimeMillis();
                                commsTimingDetails.setDnsLookupMillis(currentTimeMillis2 - currentTimeMillis);
                                commsTimingDetails.setConnectMillis(currentTimeMillis3 - currentTimeMillis2);
                                commsTimingDetails.setSendRequestMillis(currentTimeMillis4 - currentTimeMillis3);
                                commsTimingDetails.setReceiveFirstByteMillis(currentTimeMillis5 - currentTimeMillis4);
                                commsTimingDetails.setReceiveFullResponseMillis(currentTimeMillis6 - currentTimeMillis4);
                                bufferedOutputStream.close();
                                bufferedInputStream.close();
                                if (createSocket != null) {
                                    createSocket.close();
                                }
                                return unmarshal;
                            } catch (IOException e) {
                                throw new ProtocolException("Failed unmarshalling '" + ProtocolMessage.MessageType.CONNECTION_RESPONSE + "' protocol message from " + createSocket.getRemoteSocketAddress(), e);
                            }
                        } catch (IOException e2) {
                            throw new ProtocolException("Failed marshalling '" + protocolMessage.getType() + "' protocol message", e2);
                        }
                    } catch (Throwable th) {
                        try {
                            bufferedOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    try {
                        bufferedInputStream.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (IOException e3) {
            throw new ProtocolException("Failed to send message to Cluster Coordinator", e3);
        }
    }

    protected abstract InetSocketAddress getServiceAddress() throws IOException;
}
