package org.apache.nifi.remote;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.Response;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/AbstractTransaction.class */
public abstract class AbstractTransaction implements Transaction {
    protected final Peer peer;
    protected final TransferDirection direction;
    private final boolean compress;
    protected final FlowFileCodec codec;
    protected final EventReporter eventReporter;
    protected final int protocolVersion;
    private final int penaltyMillis;
    protected final String destinationId;
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private final CRC32 crc = new CRC32();
    protected boolean dataAvailable = false;
    private final long creationNanoTime = System.nanoTime();
    private int transfers = 0;
    private long contentBytes = 0;
    protected Transaction.TransactionState state = Transaction.TransactionState.TRANSACTION_STARTED;

    public AbstractTransaction(Peer peer, TransferDirection transferDirection, boolean z, FlowFileCodec flowFileCodec, EventReporter eventReporter, int i, int i2, String str) {
        this.peer = peer;
        this.direction = transferDirection;
        this.compress = z;
        this.codec = flowFileCodec;
        this.eventReporter = eventReporter;
        this.protocolVersion = i;
        this.penaltyMillis = i2;
        this.destinationId = str;
    }

    protected void close() throws IOException {
    }

    @Override // org.apache.nifi.remote.Transaction
    public void send(byte[] bArr, Map<String, String> map) throws IOException {
        send(new StandardDataPacket(map, new ByteArrayInputStream(bArr), bArr.length));
    }

    @Override // org.apache.nifi.remote.Transaction
    public void error() {
        this.state = Transaction.TransactionState.ERROR;
        try {
            close();
        } catch (IOException e) {
            this.logger.warn("Failed to close transaction due to {}", e.getMessage());
            if (this.logger.isDebugEnabled()) {
                this.logger.warn("", e);
            }
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public Transaction.TransactionState getState() {
        return this.state;
    }

    @Override // org.apache.nifi.remote.Transaction
    public Peer getCommunicant() {
        return this.peer;
    }

    @Override // org.apache.nifi.remote.Transaction
    public final DataPacket receive() throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED && this.state != Transaction.TransactionState.TRANSACTION_STARTED) {
                    throw new IllegalStateException("Cannot receive data from " + this.peer + " because Transaction State is " + this.state);
                }
                if (this.direction == TransferDirection.SEND) {
                    throw new IllegalStateException("Attempting to receive data from " + this.peer + " but started a SEND Transaction");
                }
                if (!this.dataAvailable) {
                    return null;
                }
                if (this.transfers > 0) {
                    Response readTransactionResponse = readTransactionResponse();
                    switch (readTransactionResponse.getCode()) {
                        case CONTINUE_TRANSACTION:
                            this.logger.debug("{} {} Indicates Transaction should continue", this, this.peer);
                            this.dataAvailable = true;
                            break;
                        case FINISH_TRANSACTION:
                            this.logger.debug("{} {} Indicates Transaction should finish", this, this.peer);
                            this.dataAvailable = false;
                            break;
                        default:
                            throw new ProtocolException("Got unexpected response from " + this.peer + " when asking for data: " + readTransactionResponse);
                    }
                }
                if (!this.dataAvailable) {
                    return null;
                }
                this.logger.debug("{} Receiving data from {}", this, this.peer);
                CompressionInputStream inputStream = this.peer.getCommunicationsSession().getInput().getInputStream();
                DataPacket decode = this.codec.decode(new CheckedInputStream(this.compress ? new CompressionInputStream(inputStream) : inputStream, this.crc));
                if (decode == null) {
                    this.dataAvailable = false;
                } else {
                    this.transfers++;
                    this.contentBytes += decode.getSize();
                }
                this.state = Transaction.TransactionState.DATA_EXCHANGED;
                return decode;
            } catch (IOException e) {
                throw new IOException("Failed to receive data from " + this.peer + " due to " + e, e);
            }
        } catch (Exception e2) {
            error();
            throw e2;
        }
    }

    protected abstract Response readTransactionResponse() throws IOException;

    protected final void writeTransactionResponse(ResponseCode responseCode) throws IOException {
        writeTransactionResponse(responseCode, null);
    }

    protected void writeTransactionResponse(ResponseCode responseCode, String str) throws IOException {
        writeTransactionResponse(responseCode, str, true);
    }

    protected abstract void writeTransactionResponse(ResponseCode responseCode, String str, boolean z) throws IOException;

    @Override // org.apache.nifi.remote.Transaction
    public final void confirm() throws IOException {
        try {
            try {
                if (this.state == Transaction.TransactionState.TRANSACTION_STARTED && !this.dataAvailable && this.direction == TransferDirection.RECEIVE) {
                    this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                    return;
                }
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED) {
                    throw new IllegalStateException("Cannot confirm Transaction because state is " + this.state + "; Transaction can only be confirmed when state is " + Transaction.TransactionState.DATA_EXCHANGED);
                }
                this.peer.getCommunicationsSession();
                if (this.direction != TransferDirection.RECEIVE) {
                    this.logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, this.peer);
                    writeTransactionResponse(ResponseCode.FINISH_TRANSACTION);
                    String valueOf = String.valueOf(this.crc.getValue());
                    Response readTransactionResponse = readTransactionResponse();
                    if (readTransactionResponse.getCode() != ResponseCode.CONFIRM_TRANSACTION) {
                        throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + this.peer + " but received " + readTransactionResponse);
                    }
                    this.logger.trace("{} Received {} from {}", new Object[]{this, readTransactionResponse, this.peer});
                    String message = readTransactionResponse.getMessage();
                    if (this.protocolVersion > 3 && !message.equals(valueOf)) {
                        writeTransactionResponse(ResponseCode.BAD_CHECKSUM);
                        throw new IOException(this + " Sent data to peer " + this.peer + " but calculated CRC32 Checksum as " + valueOf + " while peer calculated CRC32 Checksum as " + message + "; canceling transaction and rolling back session");
                    }
                    writeTransactionResponse(ResponseCode.CONFIRM_TRANSACTION, "");
                    this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                } else {
                    if (this.dataAvailable) {
                        throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed.");
                    }
                    this.logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, this.peer);
                    writeTransactionResponse(ResponseCode.CONFIRM_TRANSACTION, String.valueOf(this.crc.getValue()));
                    try {
                        Response readTransactionResponse2 = readTransactionResponse();
                        this.logger.trace("{} Received {} from {}", new Object[]{this, readTransactionResponse2, this.peer});
                        switch (readTransactionResponse2.getCode()) {
                            case CONFIRM_TRANSACTION:
                                this.state = Transaction.TransactionState.TRANSACTION_CONFIRMED;
                                break;
                            case BAD_CHECKSUM:
                                throw new IOException(this + " Received a BadChecksum response from peer " + this.peer);
                            default:
                                throw new ProtocolException(this + " Received unexpected Response from peer " + this.peer + " : " + readTransactionResponse2 + "; expected 'Confirm Transaction' Response Code");
                        }
                    } catch (IOException e) {
                        this.logger.error("Failed to receive response code from {} when expecting confirmation of transaction", this.peer);
                        if (this.eventReporter != null) {
                            this.eventReporter.reportEvent(Severity.ERROR, "Site-to-Site", "Failed to receive response code from " + this.peer + " when expecting confirmation of transaction");
                        }
                        throw e;
                    }
                }
            } catch (Exception e2) {
                error();
                throw e2;
            }
        } catch (IOException e3) {
            throw new IOException("Failed to confirm transaction with " + this.peer + " due to " + e3, e3);
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public final TransactionCompletion complete() throws IOException {
        try {
            try {
                try {
                    if (this.state != Transaction.TransactionState.TRANSACTION_CONFIRMED) {
                        throw new IllegalStateException("Cannot complete transaction with " + this.peer + " because state is " + this.state + "; Transaction can only be completed when state is " + Transaction.TransactionState.TRANSACTION_CONFIRMED);
                    }
                    boolean z = false;
                    if (this.direction != TransferDirection.RECEIVE) {
                        try {
                            Response readTransactionResponse = readTransactionResponse();
                            this.logger.debug("{} Received {} from {}", new Object[]{this, readTransactionResponse, this.peer});
                            if (readTransactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
                                this.peer.penalize(this.destinationId, this.penaltyMillis);
                                z = true;
                            } else if (readTransactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
                                throw new ProtocolException("After sending data to " + this.peer + ", expected TRANSACTION_FINISHED response but got " + readTransactionResponse);
                            }
                            this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                        } catch (IOException e) {
                            throw new IOException(this + " Failed to receive a response from " + this.peer + " when expecting a TransactionFinished Indicator. It is unknown whether or not the peer successfully received/processed the data. " + e, e);
                        }
                    } else {
                        if (this.transfers == 0) {
                            this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                            ClientTransactionCompletion clientTransactionCompletion = new ClientTransactionCompletion(false, 0, 0L, System.nanoTime() - this.creationNanoTime);
                            close();
                            return clientTransactionCompletion;
                        }
                        this.logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, this.peer);
                        writeTransactionResponse(ResponseCode.TRANSACTION_FINISHED);
                        this.state = Transaction.TransactionState.TRANSACTION_COMPLETED;
                    }
                    ClientTransactionCompletion clientTransactionCompletion2 = new ClientTransactionCompletion(z, this.transfers, this.contentBytes, System.nanoTime() - this.creationNanoTime);
                    close();
                    return clientTransactionCompletion2;
                } catch (IOException e2) {
                    throw new IOException("Failed to complete transaction with " + this.peer + " due to " + e2, e2);
                }
            } catch (Throwable th) {
                close();
                throw th;
            }
        } catch (Exception e3) {
            error();
            throw e3;
        }
    }

    @Override // org.apache.nifi.remote.Transaction
    public final void cancel(String str) throws IOException {
        if (this.state != Transaction.TransactionState.TRANSACTION_CANCELED && this.state != Transaction.TransactionState.TRANSACTION_COMPLETED) {
            try {
                if (this.state != Transaction.TransactionState.ERROR) {
                    try {
                        writeTransactionResponse(ResponseCode.CANCEL_TRANSACTION, str == null ? "<No explanation given>" : str);
                        this.state = Transaction.TransactionState.TRANSACTION_CANCELED;
                        close();
                        return;
                    } catch (IOException e) {
                        error();
                        throw new IOException("Failed to send 'cancel transaction' message to " + this.peer + " due to " + e, e);
                    }
                }
            } catch (Throwable th) {
                close();
                throw th;
            }
        }
        throw new IllegalStateException("Cannot cancel transaction because state is already " + this.state);
    }

    public final String toString() {
        return getClass().getSimpleName() + "[Url=" + this.peer.getUrl() + ", TransferDirection=" + this.direction + ", State=" + this.state + "]";
    }

    @Override // org.apache.nifi.remote.Transaction
    public final void send(DataPacket dataPacket) throws IOException {
        try {
            try {
                if (this.state != Transaction.TransactionState.DATA_EXCHANGED && this.state != Transaction.TransactionState.TRANSACTION_STARTED) {
                    throw new IllegalStateException("Cannot send data to " + this.peer + " because Transaction State is " + this.state);
                }
                if (this.direction == TransferDirection.RECEIVE) {
                    throw new IllegalStateException("Attempting to send data to " + this.peer + " but started a RECEIVE Transaction");
                }
                if (this.transfers > 0) {
                    writeTransactionResponse(ResponseCode.CONTINUE_TRANSACTION, null, false);
                }
                this.logger.debug("{} Sending data to {}", this, this.peer);
                CompressionOutputStream outputStream = this.peer.getCommunicationsSession().getOutput().getOutputStream();
                CheckedOutputStream checkedOutputStream = new CheckedOutputStream(this.compress ? new CompressionOutputStream(outputStream) : outputStream, this.crc);
                this.codec.encode(dataPacket, checkedOutputStream);
                if (this.compress) {
                    checkedOutputStream.close();
                }
                this.transfers++;
                this.contentBytes += dataPacket.getSize();
                this.state = Transaction.TransactionState.DATA_EXCHANGED;
            } catch (IOException e) {
                throw new IOException("Failed to send data to " + this.peer + " due to " + e, e);
            }
        } catch (Exception e2) {
            error();
            throw e2;
        }
    }
}
