package org.apache.nifi.controller.queue.clustered.client.async.nio;

import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
import java.util.function.Supplier;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
import org.apache.nifi.controller.queue.clustered.TransactionThreshold;
import org.apache.nifi.controller.queue.clustered.client.LoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.server.TransactionAbortedException;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession.class */
public class LoadBalanceSession {
    private static final Logger logger = LoggerFactory.getLogger(LoadBalanceSession.class);
    static final int MAX_DATA_FRAME_SIZE = 65535;
    private final RegisteredPartition partition;
    private final Supplier<FlowFileRecord> flowFileSupplier;
    private final FlowFileContentAccess flowFileContentAccess;
    private final LoadBalanceFlowFileCodec flowFileCodec;
    private final PeerChannel channel;
    private final int timeoutMillis;
    private final String peerDescription;
    private final String connectionId;
    private final TransactionThreshold transactionThreshold;
    private ByteBuffer preparedFrame;
    private FlowFileRecord currentFlowFile;
    private InputStream flowFileInputStream;
    private long readTimeout;
    final VersionNegotiator negotiator = new StandardVersionNegotiator(new int[]{1});
    private int protocolVersion = 1;
    private final Checksum checksum = new CRC32();
    private final List<FlowFileRecord> flowFilesSent = new ArrayList();
    private TransactionPhase phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
    private final byte[] byteBuffer = new byte[MAX_DATA_FRAME_SIZE];
    private volatile LoadBalanceSessionState sessionState = LoadBalanceSessionState.ACTIVE;

    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession$LoadBalanceSessionState.class */
    public enum LoadBalanceSessionState {
        ACTIVE(false),
        COMPLETED_SUCCESSFULLY(true),
        COMPLETED_EXCEPTIONALLY(true),
        CANCELED(true);

        private final boolean complete;

        LoadBalanceSessionState(boolean z) {
            this.complete = z;
        }

        public boolean isComplete() {
            return this.complete;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/client/async/nio/LoadBalanceSession$TransactionPhase.class */
    public enum TransactionPhase {
        RECOMMEND_PROTOCOL_VERSION(4),
        RECEIVE_PROTOCOL_VERSION_ACKNOWLEDGMENT(1),
        RECEIVE_RECOMMENDED_PROTOCOL_VERSION(1),
        ABORT_PROTOCOL_NEGOTIATION(4),
        SEND_CONNECTION_ID(4),
        CHECK_SPACE(4),
        RECEIVE_SPACE_RESPONSE(1),
        SEND_FLOWFILE_DEFINITION(4),
        SEND_FLOWFILE_CONTENTS(4),
        GET_NEXT_FLOWFILE(4),
        SEND_CHECKSUM(4),
        VERIFY_CHECKSUM(1),
        SEND_TRANSACTION_COMPLETE(4),
        CONFIRM_TRANSACTION_COMPLETE(1);

        private final int requiredSelectionKey;

        TransactionPhase(int i) {
            this.requiredSelectionKey = i;
        }

        public int getRequiredSelectionKey() {
            return this.requiredSelectionKey;
        }
    }

    public LoadBalanceSession(RegisteredPartition registeredPartition, FlowFileContentAccess flowFileContentAccess, LoadBalanceFlowFileCodec loadBalanceFlowFileCodec, PeerChannel peerChannel, int i, TransactionThreshold transactionThreshold) {
        this.partition = registeredPartition;
        this.flowFileSupplier = registeredPartition.getFlowFileRecordSupplier();
        this.connectionId = registeredPartition.getConnectionId();
        this.flowFileContentAccess = flowFileContentAccess;
        this.flowFileCodec = loadBalanceFlowFileCodec;
        this.channel = peerChannel;
        this.peerDescription = peerChannel.getPeerDescription();
        if (i < 1) {
            throw new IllegalArgumentException();
        }
        this.timeoutMillis = i;
        this.transactionThreshold = transactionThreshold;
    }

    public RegisteredPartition getPartition() {
        return this.partition;
    }

    public synchronized int getDesiredReadinessFlag() {
        return this.phase.getRequiredSelectionKey();
    }

    public synchronized List<FlowFileRecord> getAndPurgeFlowFilesSent() {
        ArrayList arrayList = new ArrayList(this.flowFilesSent);
        this.flowFilesSent.clear();
        return arrayList;
    }

    public synchronized LoadBalanceSessionState getSessionState() {
        return this.sessionState;
    }

    public synchronized boolean communicate() throws IOException {
        if (this.sessionState.isComplete()) {
            return false;
        }
        try {
            if (this.preparedFrame != null && this.preparedFrame.hasRemaining()) {
                logger.trace("Current Frame is already available. Will continue writing current frame to channel");
                return this.channel.write(this.preparedFrame) > 0;
            }
            switch (this.phase) {
                case RECEIVE_SPACE_RESPONSE:
                    return receiveSpaceAvailableResponse();
                case VERIFY_CHECKSUM:
                    return verifyChecksum();
                case CONFIRM_TRANSACTION_COMPLETE:
                    return confirmTransactionComplete();
                case RECEIVE_PROTOCOL_VERSION_ACKNOWLEDGMENT:
                    return receiveProtocolVersionAcknowledgment();
                case RECEIVE_RECOMMENDED_PROTOCOL_VERSION:
                    return receiveRecommendedProtocolVersion();
                default:
                    this.preparedFrame = this.channel.prepareForWrite(getDataFrame());
                    return this.channel.write(this.preparedFrame) > 0;
            }
        } catch (Exception e) {
            this.sessionState = LoadBalanceSessionState.COMPLETED_EXCEPTIONALLY;
            throw e;
        }
    }

    public synchronized boolean cancel() {
        if (this.sessionState.isComplete()) {
            return false;
        }
        this.sessionState = LoadBalanceSessionState.CANCELED;
        return true;
    }

    private boolean confirmTransactionComplete() throws IOException {
        logger.debug("Confirming Transaction Complete for Peer {}", this.peerDescription);
        OptionalInt read = this.channel.read();
        if (!read.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to confirm the transaction is complete");
            }
            return false;
        }
        int asInt = read.getAsInt();
        if (asInt < 0) {
            throw new EOFException("Confirmed checksum when writing data to Peer " + this.peerDescription + " but encountered End-of-File when expecting a Transaction Complete confirmation");
        }
        if (asInt == 36) {
            throw new TransactionAbortedException("Confirmed checksum when writing data to Peer " + this.peerDescription + " but Peer aborted transaction instead of completing it");
        }
        if (asInt != 37) {
            throw new IOException("Expected a CONFIRM_COMPLETE_TRANSACTION response from Peer " + this.peerDescription + " but received a value of " + asInt);
        }
        this.sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY;
        logger.debug("Successfully completed Transaction to send {} FlowFiles to Peer {} for Connection {}", new Object[]{Integer.valueOf(this.flowFilesSent.size()), this.peerDescription, this.connectionId});
        return true;
    }

    private boolean verifyChecksum() throws IOException {
        logger.debug("Verifying Checksum for Peer {}", this.peerDescription);
        OptionalInt read = this.channel.read();
        if (!read.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to verify the checksum");
            }
            return false;
        }
        int asInt = read.getAsInt();
        if (asInt < 0) {
            throw new EOFException("Encountered End-of-File when trying to verify Checksum with Peer " + this.peerDescription);
        }
        if (asInt == 34) {
            throw new TransactionAbortedException("After transferring FlowFiles to Peer " + this.peerDescription + " received a REJECT_CHECKSUM response. Aborting transaction.");
        }
        if (asInt != 33) {
            throw new TransactionAbortedException("After transferring FlowFiles to Peer " + this.peerDescription + " received an unexpected response code " + asInt + ". Aborting transaction.");
        }
        logger.debug("Checksum confirmed. Writing COMPLETE_TRANSACTION flag");
        this.phase = TransactionPhase.SEND_TRANSACTION_COMPLETE;
        return true;
    }

    private ByteBuffer getDataFrame() throws IOException {
        switch (this.phase) {
            case RECOMMEND_PROTOCOL_VERSION:
                return recommendProtocolVersion();
            case ABORT_PROTOCOL_NEGOTIATION:
                return abortProtocolNegotiation();
            case SEND_CONNECTION_ID:
                return getConnectionId();
            case CHECK_SPACE:
                return checkSpace();
            case GET_NEXT_FLOWFILE:
                return getNextFlowFile();
            case SEND_FLOWFILE_DEFINITION:
            case SEND_FLOWFILE_CONTENTS:
                return getFlowFileContent();
            case SEND_CHECKSUM:
                return getChecksum();
            case SEND_TRANSACTION_COMPLETE:
                return getTransactionComplete();
            default:
                logger.debug("Phase of {}, returning null ByteBuffer", this.phase);
                return null;
        }
    }

    private ByteBuffer getTransactionComplete() {
        logger.debug("Sending Transaction Complete Indicator to Peer {}", this.peerDescription);
        ByteBuffer allocate = ByteBuffer.allocate(1);
        allocate.put((byte) 35);
        allocate.rewind();
        this.readTimeout = System.currentTimeMillis() + this.timeoutMillis;
        this.phase = TransactionPhase.CONFIRM_TRANSACTION_COMPLETE;
        return allocate;
    }

    private ByteBuffer getChecksum() {
        logger.debug("Sending Checksum of {} to Peer {}", Long.valueOf(this.checksum.getValue()), this.peerDescription);
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(this.checksum.getValue());
        this.readTimeout = System.currentTimeMillis() + this.timeoutMillis;
        this.phase = TransactionPhase.VERIFY_CHECKSUM;
        allocate.rewind();
        return allocate;
    }

    private ByteBuffer getFlowFileContent() throws IOException {
        ByteBuffer allocate;
        try {
            if (this.flowFileInputStream == null) {
                this.flowFileInputStream = this.flowFileContentAccess.read(this.currentFlowFile);
            }
            int fillBuffer = StreamUtils.fillBuffer(this.flowFileInputStream, this.byteBuffer, false);
            if (fillBuffer < 1) {
                this.flowFileInputStream.close();
                this.flowFileInputStream = null;
                this.phase = TransactionPhase.GET_NEXT_FLOWFILE;
                ByteBuffer allocate2 = ByteBuffer.allocate(1);
                allocate2.put((byte) 64);
                allocate2.rewind();
                this.checksum.update(64);
                logger.debug("Sending NO_DATA_FRAME indicator to Peer {}", this.peerDescription);
                return allocate2;
            }
            logger.trace("Sending Data Frame that is {} bytes long to Peer {}", Integer.valueOf(fillBuffer), this.peerDescription);
            if (this.partition.getCompression() == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT) {
                byte[] compressDataFrame = compressDataFrame(this.byteBuffer, fillBuffer);
                int length = compressDataFrame.length;
                allocate = ByteBuffer.allocate(5 + length);
                allocate.put((byte) 66);
                allocate.putInt(length);
                allocate.put(compressDataFrame, 0, length);
            } else {
                allocate = ByteBuffer.allocate(5 + fillBuffer);
                allocate.put((byte) 66);
                allocate.putInt(fillBuffer);
                allocate.put(this.byteBuffer, 0, fillBuffer);
            }
            byte[] array = allocate.array();
            this.checksum.update(array, 0, array.length);
            this.phase = TransactionPhase.SEND_FLOWFILE_CONTENTS;
            allocate.rewind();
            return allocate;
        } catch (ContentNotFoundException e) {
            throw new ContentNotFoundException(this.currentFlowFile, e.getMissingClaim(), e.getMessage());
        }
    }

    private byte[] compressDataFrame(byte[] bArr, int i) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(byteArrayOutputStream, 1);
            try {
                gZIPOutputStream.write(bArr, 0, i);
                gZIPOutputStream.close();
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                gZIPOutputStream.close();
                byteArrayOutputStream.close();
                return byteArray;
            } finally {
            }
        } catch (Throwable th) {
            try {
                byteArrayOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private ByteBuffer getNextFlowFile() throws IOException {
        if (this.transactionThreshold.isThresholdMet()) {
            this.currentFlowFile = null;
            logger.debug("Transaction Threshold reached sending to Peer {}; Transitioning phase to SEND_CHECKSUM", this.peerDescription);
        } else {
            this.currentFlowFile = this.flowFileSupplier.get();
            if (this.currentFlowFile == null) {
                logger.debug("No more FlowFiles to send to Peer {}; Transitioning phase to SEND_CHECKSUM", this.peerDescription);
            }
        }
        if (this.currentFlowFile == null) {
            this.phase = TransactionPhase.SEND_CHECKSUM;
            return noMoreFlowFiles();
        }
        this.transactionThreshold.adjust(1, this.currentFlowFile.getSize());
        logger.debug("Next FlowFile to send to Peer {} is {}", this.peerDescription, this.currentFlowFile);
        this.flowFilesSent.add(this.currentFlowFile);
        LoadBalanceCompression compression = this.partition.getCompression();
        boolean z = compression != LoadBalanceCompression.DO_NOT_COMPRESS;
        logger.debug("Compression to use for sending to Peer {} is {}", this.peerDescription, compression);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            if (z) {
                GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(byteArrayOutputStream, 1);
                try {
                    OutputStream byteCountingOutputStream = new ByteCountingOutputStream(gZIPOutputStream);
                    try {
                        this.flowFileCodec.encode(this.currentFlowFile, byteCountingOutputStream);
                        byteCountingOutputStream.close();
                        gZIPOutputStream.close();
                    } catch (Throwable th) {
                        try {
                            byteCountingOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } finally {
                }
            } else {
                this.flowFileCodec.encode(this.currentFlowFile, byteArrayOutputStream);
            }
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            byteArrayOutputStream.close();
            int length = byteArray.length;
            ByteBuffer allocate = ByteBuffer.allocate(byteArray.length + 5);
            allocate.put((byte) 49);
            this.checksum.update(49);
            allocate.putInt(length);
            this.checksum.update((length >> 24) & 255);
            this.checksum.update((length >> 16) & 255);
            this.checksum.update((length >> 8) & 255);
            this.checksum.update(length & 255);
            allocate.put(byteArray);
            this.checksum.update(byteArray, 0, byteArray.length);
            this.phase = TransactionPhase.SEND_FLOWFILE_DEFINITION;
            allocate.rewind();
            return allocate;
        } catch (Throwable th3) {
            try {
                byteArrayOutputStream.close();
            } catch (Throwable th4) {
                th3.addSuppressed(th4);
            }
            throw th3;
        }
    }

    private ByteBuffer recommendProtocolVersion() {
        logger.debug("Recommending to Peer {} that Protocol Version {} be used", this.peerDescription, Integer.valueOf(this.protocolVersion));
        ByteBuffer allocate = ByteBuffer.allocate(1);
        allocate.put((byte) this.protocolVersion);
        allocate.rewind();
        this.readTimeout = System.currentTimeMillis() + this.timeoutMillis;
        this.phase = TransactionPhase.RECEIVE_PROTOCOL_VERSION_ACKNOWLEDGMENT;
        return allocate;
    }

    private boolean receiveProtocolVersionAcknowledgment() throws IOException {
        logger.debug("Confirming Protocol Version for Peer {}", this.peerDescription);
        OptionalInt read = this.channel.read();
        if (!read.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to acknowledge Protocol Version");
            }
            return false;
        }
        int asInt = read.getAsInt();
        if (asInt < 0) {
            throw new EOFException("Encounter End-of-File with Peer " + this.peerDescription + " when expecting a Protocol Version Acknowledgment");
        }
        if (asInt == 16) {
            logger.debug("Peer {} accepted Protocol Version {}", this.peerDescription, Integer.valueOf(this.protocolVersion));
            this.phase = TransactionPhase.SEND_CONNECTION_ID;
            return true;
        }
        if (asInt != 17) {
            throw new IOException("Failed to negotiate Protocol Version with Peer " + this.peerDescription + ". Recommended version " + this.protocolVersion + " but instead of an ACCEPT or REJECT response got back a response of " + asInt);
        }
        logger.debug("Recommended using Protocol Version of {} with Peer {} but received REQUEST_DIFFERENT_VERSION response", Integer.valueOf(this.protocolVersion), this.peerDescription);
        this.readTimeout = System.currentTimeMillis() + this.timeoutMillis;
        this.phase = TransactionPhase.RECEIVE_RECOMMENDED_PROTOCOL_VERSION;
        return true;
    }

    private boolean receiveRecommendedProtocolVersion() throws IOException {
        logger.debug("Receiving Protocol Version from Peer {}", this.peerDescription);
        OptionalInt read = this.channel.read();
        if (!read.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to recommend Protocol Version");
            }
            return false;
        }
        int asInt = read.getAsInt();
        if (asInt < 0) {
            throw new EOFException("Encounter End-of-File with Peer " + this.peerDescription + " when expecting a Protocol Version Recommendation");
        }
        if (this.negotiator.isVersionSupported(asInt)) {
            this.protocolVersion = asInt;
            this.phase = TransactionPhase.SEND_CONNECTION_ID;
            logger.debug("Peer {} recommended Protocol Version of {}. Accepting version.", this.peerDescription, Integer.valueOf(asInt));
            return true;
        }
        Integer preferredVersion = this.negotiator.getPreferredVersion(asInt);
        if (preferredVersion == null) {
            logger.debug("Peer {} requested version {} of the Load Balance Protocol. This version is not acceptable. Aborting communications.", this.peerDescription, Integer.valueOf(asInt));
            this.phase = TransactionPhase.ABORT_PROTOCOL_NEGOTIATION;
            return true;
        }
        logger.debug("Peer {} requested version {} of the Protocol. Recommending version {} instead", new Object[]{this.peerDescription, Integer.valueOf(asInt), preferredVersion});
        this.protocolVersion = preferredVersion.intValue();
        this.phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
        return true;
    }

    private ByteBuffer noMoreFlowFiles() {
        ByteBuffer allocate = ByteBuffer.allocate(1);
        allocate.put((byte) 50);
        allocate.rewind();
        this.checksum.update(50);
        return allocate;
    }

    private ByteBuffer abortProtocolNegotiation() {
        ByteBuffer allocate = ByteBuffer.allocate(1);
        allocate.put((byte) 18);
        allocate.rewind();
        return allocate;
    }

    private ByteBuffer getConnectionId() {
        logger.debug("Sending Connection ID {} to Peer {}", this.connectionId, this.peerDescription);
        ByteBuffer allocate = ByteBuffer.allocate(this.connectionId.length() + 2);
        allocate.putShort((short) this.connectionId.length());
        allocate.put(this.connectionId.getBytes(StandardCharsets.UTF_8));
        allocate.rewind();
        byte[] array = allocate.array();
        this.checksum.update(array, 0, array.length);
        this.phase = TransactionPhase.CHECK_SPACE;
        return allocate;
    }

    private ByteBuffer checkSpace() {
        logger.debug("Sending a 'Check Space' request to Peer {} to determine if there is space in the queue for more FlowFiles", this.peerDescription);
        ByteBuffer allocate = ByteBuffer.allocate(1);
        if (this.partition.isHonorBackpressure()) {
            allocate.put((byte) 97);
            this.checksum.update(97);
            this.readTimeout = System.currentTimeMillis() + this.timeoutMillis;
            this.phase = TransactionPhase.RECEIVE_SPACE_RESPONSE;
        } else {
            allocate.put((byte) 98);
            this.checksum.update(98);
            this.phase = TransactionPhase.GET_NEXT_FLOWFILE;
        }
        allocate.rewind();
        return allocate;
    }

    private boolean receiveSpaceAvailableResponse() throws IOException {
        logger.debug("Receiving response from Peer {} to determine whether or not space is available in queue {}", this.peerDescription, this.connectionId);
        OptionalInt read = this.channel.read();
        if (!read.isPresent()) {
            if (System.currentTimeMillis() > this.readTimeout) {
                throw new SocketTimeoutException("Timed out waiting for Peer " + this.peerDescription + " to verify whether or not space is available for Connection " + this.connectionId);
            }
            return false;
        }
        int asInt = read.getAsInt();
        if (asInt < 0) {
            throw new EOFException("Encountered End-of-File when trying to verify with Peer " + this.peerDescription + " whether or not space is available in Connection " + this.connectionId);
        }
        if (asInt == 101) {
            logger.debug("Peer {} has confirmed that space is available in Connection {}", this.peerDescription, this.connectionId);
            this.phase = TransactionPhase.GET_NEXT_FLOWFILE;
            return true;
        }
        if (asInt != 102) {
            throw new TransactionAbortedException("After requesting to know whether or not Peer " + this.peerDescription + " has space available in Connection " + this.connectionId + ", received unexpected response of " + asInt + ". Aborting transaction.");
        }
        logger.debug("Peer {} has confirmed that the queue is full for Connection {}", this.peerDescription, this.connectionId);
        this.phase = TransactionPhase.RECOMMEND_PROTOCOL_VERSION;
        this.checksum.reset();
        this.sessionState = LoadBalanceSessionState.COMPLETED_SUCCESSFULLY;
        this.partition.penalize(1000L);
        return true;
    }
}
