package org.apache.nifi.controller.queue.clustered.server;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import java.util.zip.GZIPInputStream;
import javax.net.ssl.SSLSocket;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.class */
public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
    private static final Logger logger = LoggerFactory.getLogger(StandardLoadBalanceProtocol.class);
    private static final int SOCKET_CLOSED = -1;
    private static final int NO_DATA_AVAILABLE = 0;
    private final FlowFileRepository flowFileRepository;
    private final ContentRepository contentRepository;
    private final ProvenanceRepository provenanceRepository;
    private final FlowController flowController;
    private final LoadBalanceAuthorizer authorizer;
    private final ThreadLocal<byte[]> dataBuffer = new ThreadLocal<>();
    private final AtomicLong lineageStartIndex = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol$ContentClaimTriple.class */
    public static class ContentClaimTriple {
        private final ContentClaim contentClaim;
        private final long claimOffset;
        private final long contentLength;

        public ContentClaimTriple(ContentClaim contentClaim, long j, long j2) {
            this.contentClaim = contentClaim;
            this.claimOffset = j;
            this.contentLength = j2;
        }

        public ContentClaim getContentClaim() {
            return this.contentClaim;
        }

        public long getClaimOffset() {
            return this.claimOffset;
        }

        public long getContentLength() {
            return this.contentLength;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol$RemoteFlowFileRecord.class */
    public static class RemoteFlowFileRecord {
        private final String remoteUuid;
        private final FlowFileRecord flowFile;

        public RemoteFlowFileRecord(String str, FlowFileRecord flowFileRecord) {
            this.remoteUuid = str;
            this.flowFile = flowFileRecord;
        }

        public String getRemoteUuid() {
            return this.remoteUuid;
        }

        public FlowFileRecord getFlowFile() {
            return this.flowFile;
        }
    }

    public StandardLoadBalanceProtocol(FlowFileRepository flowFileRepository, ContentRepository contentRepository, ProvenanceRepository provenanceRepository, FlowController flowController, LoadBalanceAuthorizer loadBalanceAuthorizer) {
        this.flowFileRepository = flowFileRepository;
        this.contentRepository = contentRepository;
        this.provenanceRepository = provenanceRepository;
        this.flowController = flowController;
        this.authorizer = loadBalanceAuthorizer;
    }

    @Override // org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol
    public void receiveFlowFiles(Socket socket, InputStream inputStream, OutputStream outputStream) throws IOException {
        String hostName = socket.getInetAddress().getHostName();
        String str = socket.getLocalSocketAddress() + "::" + socket.getRemoteSocketAddress();
        if (socket instanceof SSLSocket) {
            logger.debug("Connection received from peer {}", hostName);
            hostName = this.authorizer.authorize((SSLSocket) socket);
            str = hostName + "::" + str;
            logger.debug("Client Identities are authorized to load balance data for peer {}", hostName);
        }
        int negotiateProtocolVersion = negotiateProtocolVersion(inputStream, outputStream, hostName, str);
        if (negotiateProtocolVersion == SOCKET_CLOSED) {
            socket.close();
        } else if (negotiateProtocolVersion == 0) {
            logger.debug("No data is available from {}", hostName);
        } else {
            receiveFlowFiles(inputStream, outputStream, hostName, negotiateProtocolVersion);
        }
    }

    protected int negotiateProtocolVersion(InputStream inputStream, OutputStream outputStream, String str, String str2) throws IOException {
        StandardVersionNegotiator standardVersionNegotiator = new StandardVersionNegotiator(new int[]{1});
        int i = NO_DATA_AVAILABLE;
        while (true) {
            try {
                int read = inputStream.read();
                if (read < 0) {
                    logger.debug("Encountered End-of-File when receiving the the recommended Protocol Version. Returning -1 for the protocol version");
                    return SOCKET_CLOSED;
                }
                if (standardVersionNegotiator.isVersionSupported(read)) {
                    logger.debug("Peer {} requested version {} of the Load Balance Protocol over Channel {}. Accepting version.", new Object[]{str, Integer.valueOf(read), str2});
                    outputStream.write(16);
                    outputStream.flush();
                    return read;
                }
                Integer preferredVersion = standardVersionNegotiator.getPreferredVersion(read);
                if (preferredVersion == null) {
                    logger.debug("Peer {} requested version {} of the Load Balance Protocol over Channel {}. This version is not acceptable. Aborting communications.", new Object[]{str, Integer.valueOf(read), str2});
                    outputStream.write(18);
                    outputStream.flush();
                    throw new IOException("Peer " + str + " requested that we use version " + read + " of the Load Balance Protocol over Channel " + str2 + ", but this version is unacceptable. Aborted communications.");
                }
                logger.debug("Peer {} requested version {} of the Load Balance Protocol over Channel {}. Requesting that peer change to version {} instead.", new Object[]{str, Integer.valueOf(read), str2, preferredVersion});
                outputStream.write(17);
                outputStream.write(preferredVersion.intValue());
                outputStream.flush();
                i++;
            } catch (SocketTimeoutException e) {
                if (i != 0) {
                    throw e;
                }
                logger.debug("SocketTimeoutException thrown when trying to negotiate Protocol Version");
                return NO_DATA_AVAILABLE;
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    protected void receiveFlowFiles(InputStream inputStream, OutputStream outputStream, String str, int i) throws IOException {
        logger.debug("Receiving FlowFiles from {}", str);
        long currentTimeMillis = System.currentTimeMillis();
        CRC32 crc32 = new CRC32();
        DataInputStream dataInputStream = new DataInputStream(new CheckedInputStream(inputStream, crc32));
        String connectionID = getConnectionID(dataInputStream, str);
        if (connectionID == null) {
            logger.debug("Received no Connection ID from Peer {}. Will consider receipt of FlowFiles complete", str);
            return;
        }
        Connection connection = this.flowController.getFlowManager().getConnection(connectionID);
        if (connection == null) {
            logger.error("Attempted to receive FlowFiles from Peer {} for Connection with ID {} but no connection exists with that ID", str, connectionID);
            throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + str + " for Connection with ID " + connectionID + " but no Connection exists with that ID");
        }
        LoadBalancedFlowFileQueue flowFileQueue = connection.getFlowFileQueue();
        if (!(flowFileQueue instanceof LoadBalancedFlowFileQueue)) {
            throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + str + " for Connection with ID " + connectionID + " but the Connection with that ID is not configured to allow for Load Balancing");
        }
        LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = flowFileQueue;
        int read = dataInputStream.read();
        if (read < 0) {
            throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionID + " from Peer " + str);
        }
        if (read == 97) {
            if (loadBalancedFlowFileQueue.isLocalPartitionFull()) {
                logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", str, connectionID);
                outputStream.write(LoadBalanceProtocolConstants.QUEUE_FULL);
                outputStream.flush();
                return;
            } else {
                logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with SPACE_AVAILABLE", str, connectionID);
                outputStream.write(LoadBalanceProtocolConstants.SPACE_AVAILABLE);
                outputStream.flush();
            }
        } else if (read != 98) {
            throw new TransactionAbortedException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionID + " from Peer " + str + " but instead received value " + read);
        }
        LoadBalanceCompression loadBalanceCompression = connection.getFlowFileQueue().getLoadBalanceCompression();
        logger.debug("Receiving FlowFiles from Peer {} for Connection {}; Compression = {}", new Object[]{str, connectionID, loadBalanceCompression});
        ContentClaim contentClaim = NO_DATA_AVAILABLE;
        ArrayList arrayList = new ArrayList();
        OutputStream outputStream2 = NO_DATA_AVAILABLE;
        long j = 0;
        while (isMoreFlowFiles(dataInputStream, i)) {
            try {
                try {
                    if (contentClaim == null) {
                        contentClaim = this.contentRepository.create(false);
                        outputStream2 = this.contentRepository.write(contentClaim);
                    }
                    RemoteFlowFileRecord receiveFlowFile = receiveFlowFile(dataInputStream, outputStream2, contentClaim, j, i, str, loadBalanceCompression);
                    this.contentRepository.incrementClaimaintCount(receiveFlowFile.getFlowFile().getContentClaim());
                    arrayList.add(receiveFlowFile);
                    j += receiveFlowFile.getFlowFile().getSize();
                } catch (Throwable th) {
                    if (outputStream2 != null) {
                        outputStream2.close();
                    }
                    throw th;
                }
            } catch (Exception e) {
                Iterator<RemoteFlowFileRecord> it = arrayList.iterator();
                while (it.hasNext()) {
                    this.contentRepository.decrementClaimantCount(it.next().getFlowFile().getContentClaim());
                }
                this.contentRepository.remove(contentClaim);
                throw e;
            }
        }
        if (outputStream2 != null) {
            outputStream2.close();
        }
        int decrementClaimantCount = this.contentRepository.decrementClaimantCount(contentClaim);
        verifyChecksum(crc32, inputStream, outputStream, str, arrayList.size());
        completeTransaction(inputStream, outputStream, str, arrayList, connectionID, currentTimeMillis, flowFileQueue);
        if (decrementClaimantCount == 0) {
            this.contentRepository.remove(contentClaim);
        }
        logger.debug("Successfully received {} FlowFiles from Peer {} to Load Balance for Connection {}", new Object[]{Integer.valueOf(arrayList.size()), str, connectionID});
    }

    private void completeTransaction(InputStream inputStream, OutputStream outputStream, String str, List<RemoteFlowFileRecord> list, String str2, long j, LoadBalancedFlowFileQueue loadBalancedFlowFileQueue) throws IOException {
        int read = inputStream.read();
        if (read < 0) {
            throw new EOFException("Expected to receive a Transaction Completion Indicator from Peer " + str + " but encountered EOF");
        }
        if (read == 36) {
            throw new TransactionAbortedException("Peer " + str + " chose to Abort Load Balance Transaction");
        }
        if (read != 35) {
            logger.debug("Expected to receive Transaction Completion Indicator from Peer " + str + " but instead received a value of " + read + ". Sending back an Abort Transaction Flag.");
            outputStream.write(36);
            outputStream.flush();
            throw new IOException("Expected to receive Transaction Completion Indicator from Peer " + str + " but instead received a value of " + read);
        }
        logger.debug("Received Complete Transaction indicator from Peer {}", str);
        registerReceiveProvenanceEvents(list, str, str2, j);
        updateFlowFileRepository(list, loadBalancedFlowFileQueue);
        try {
            transferFlowFilesToQueue(list, loadBalancedFlowFileQueue);
            outputStream.write(37);
            outputStream.flush();
        } catch (IllegalClusterStateException e) {
            logger.error("Failed to transferred received data into FlowFile Queue {}", loadBalancedFlowFileQueue, e);
            outputStream.write(36);
            outputStream.flush();
            try {
                cleanupRepositoriesOnTransferFailure(list, loadBalancedFlowFileQueue, "Rejected transfer due to " + e.getMessage());
            } catch (Exception e2) {
                logger.error("Failed to update FlowFile/Provenance Repositories to denote that the data that could not be received should no longer be present on this node", e2);
            }
        }
    }

    private void cleanupRepositoriesOnTransferFailure(List<RemoteFlowFileRecord> list, FlowFileQueue flowFileQueue, String str) throws IOException {
        dropFlowFilesFromRepository(list, flowFileQueue);
        reportDropEvents(list, flowFileQueue.getIdentifier(), str);
    }

    private void dropFlowFilesFromRepository(List<RemoteFlowFileRecord> list, FlowFileQueue flowFileQueue) throws IOException {
        List list2 = (List) list.stream().map(remoteFlowFileRecord -> {
            StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue, remoteFlowFileRecord.getFlowFile());
            standardRepositoryRecord.setDestination(flowFileQueue);
            standardRepositoryRecord.markForDelete();
            return standardRepositoryRecord;
        }).collect(Collectors.toList());
        this.flowFileRepository.updateRepository(list2);
        logger.debug("Updated FlowFile Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue", list2);
    }

    private void reportDropEvents(List<RemoteFlowFileRecord> list, String str, String str2) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<RemoteFlowFileRecord> it = list.iterator();
        while (it.hasNext()) {
            FlowFileRecord flowFile = it.next().getFlowFile();
            ProvenanceEventBuilder details = new StandardProvenanceEventRecord.Builder().fromFlowFile(flowFile).setEventType(ProvenanceEventType.DROP).setComponentId(str).setComponentType("Load Balanced Connection").setDetails(str2);
            ContentClaim contentClaim = flowFile.getContentClaim();
            if (contentClaim != null) {
                ResourceClaim resourceClaim = contentClaim.getResourceClaim();
                details.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
            }
            arrayList.add(details.build());
        }
        logger.debug("Updated Provenance Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue", Integer.valueOf(arrayList.size()));
        this.provenanceRepository.registerEvents(arrayList);
    }

    private void registerReceiveProvenanceEvents(List<RemoteFlowFileRecord> list, String str, String str2, long j) {
        long currentTimeMillis = System.currentTimeMillis() - j;
        ArrayList arrayList = new ArrayList(list.size());
        for (RemoteFlowFileRecord remoteFlowFileRecord : list) {
            FlowFileRecord flowFile = remoteFlowFileRecord.getFlowFile();
            ProvenanceEventBuilder componentType = new StandardProvenanceEventRecord.Builder().fromFlowFile(flowFile).setEventType(ProvenanceEventType.RECEIVE).setTransitUri("nifi://" + str + "/loadbalance/" + str2).setSourceSystemFlowFileIdentifier(remoteFlowFileRecord.getRemoteUuid()).setEventDuration(currentTimeMillis).setComponentId(str2).setComponentType("Load Balanced Connection");
            ContentClaim contentClaim = flowFile.getContentClaim();
            if (contentClaim != null) {
                ResourceClaim resourceClaim = contentClaim.getResourceClaim();
                componentType.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFile.getContentClaimOffset()), flowFile.getSize());
            }
            arrayList.add(componentType.build());
        }
        this.provenanceRepository.registerEvents(arrayList);
    }

    private void updateFlowFileRepository(List<RemoteFlowFileRecord> list, FlowFileQueue flowFileQueue) throws IOException {
        this.flowFileRepository.updateRepository((List) list.stream().map(remoteFlowFileRecord -> {
            StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue, remoteFlowFileRecord.getFlowFile());
            standardRepositoryRecord.setDestination(flowFileQueue);
            return standardRepositoryRecord;
        }).collect(Collectors.toList()));
    }

    private void transferFlowFilesToQueue(List<RemoteFlowFileRecord> list, LoadBalancedFlowFileQueue loadBalancedFlowFileQueue) throws IllegalClusterStateException {
        loadBalancedFlowFileQueue.receiveFromPeer((List) list.stream().map((v0) -> {
            return v0.getFlowFile();
        }).collect(Collectors.toList()));
    }

    private void verifyChecksum(Checksum checksum, InputStream inputStream, OutputStream outputStream, String str, int i) throws IOException {
        long readChecksum = readChecksum(inputStream);
        if (checksum.getValue() == readChecksum) {
            logger.debug("Checksum from Peer {} matched the checksum that was calculated. Writing confirmation.", str);
            outputStream.write(33);
            outputStream.flush();
        } else {
            logger.error("Received {} FlowFiles from peer {} but the Checksum reported by the peer ({}) did not match the checksum that was calculated ({}). Will reject the transaction.", new Object[]{Integer.valueOf(i), str, Long.valueOf(readChecksum), Long.valueOf(checksum.getValue())});
            outputStream.write(34);
            outputStream.flush();
            throw new TransactionAbortedException("Transaction with Peer " + str + " was aborted because the calculated checksum did not match the checksum provided by peer.");
        }
    }

    private long readChecksum(InputStream inputStream) throws IOException {
        byte[] dataBuffer = getDataBuffer();
        StreamUtils.read(inputStream, dataBuffer, 8);
        return ByteBuffer.wrap(dataBuffer, NO_DATA_AVAILABLE, 8).getLong();
    }

    private byte[] getDataBuffer() {
        byte[] bArr = this.dataBuffer.get();
        if (bArr == null) {
            bArr = new byte[69632];
            this.dataBuffer.set(bArr);
        }
        return bArr;
    }

    private String getConnectionID(DataInputStream dataInputStream, String str) throws IOException {
        try {
            return dataInputStream.readUTF();
        } catch (EOFException e) {
            logger.debug("Encountered EOFException when trying to receive Connection ID from Peer {}. Returning null for Connection ID", str);
            return null;
        }
    }

    private boolean isMoreFlowFiles(DataInputStream dataInputStream, int i) throws IOException {
        int read = dataInputStream.read();
        if (read < 0) {
            throw new EOFException();
        }
        if (read == 49) {
            logger.debug("Peer indicates that there is another FlowFile in transaction");
            return true;
        }
        if (read != 50) {
            throw new IOException("Expected to receive 'More FlowFiles' indicator (49) or 'No More FlowFiles' indicator (50) but received invalid value of " + read);
        }
        logger.debug("Peer indicates that there are no more FlowFiles in transaction");
        return false;
    }

    private RemoteFlowFileRecord receiveFlowFile(DataInputStream dataInputStream, OutputStream outputStream, ContentClaim contentClaim, long j, int i, String str, LoadBalanceCompression loadBalanceCompression) throws IOException {
        DataInputStream dataInputStream2 = new DataInputStream(new LimitingInputStream(dataInputStream, dataInputStream.readInt()));
        if (loadBalanceCompression != LoadBalanceCompression.DO_NOT_COMPRESS) {
            dataInputStream2 = new DataInputStream(new GZIPInputStream(dataInputStream2));
        }
        Map<String, String> readAttributes = readAttributes(dataInputStream2);
        String str2 = readAttributes.get(CoreAttributes.UUID.key());
        logger.debug("Received Attributes {} from Peer {}", readAttributes, str);
        long readLong = dataInputStream2.readLong();
        long readLong2 = dataInputStream2.readLong();
        long readLong3 = dataInputStream2.readLong();
        ContentClaimTriple consumeContent = consumeContent(dataInputStream, outputStream, contentClaim, j, str, loadBalanceCompression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
        FlowFileRecord build = new StandardFlowFileRecord.Builder().id(this.flowFileRepository.getNextFlowFileSequence()).addAttributes(readAttributes).addAttribute(CoreAttributes.UUID.key(), UUID.randomUUID().toString()).contentClaim(consumeContent.getContentClaim()).contentClaimOffset(consumeContent.getClaimOffset()).size(consumeContent.getContentLength()).entryDate(readLong2).lineageStart(readLong, this.lineageStartIndex.getAndIncrement()).penaltyExpirationTime(readLong3).build();
        logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", new Object[]{build, Integer.valueOf(readAttributes.size()), Long.valueOf(consumeContent.getContentLength())});
        return new RemoteFlowFileRecord(str2, build);
    }

    private Map<String, String> readAttributes(DataInputStream dataInputStream) throws IOException {
        int readInt = dataInputStream.readInt();
        HashMap hashMap = new HashMap();
        for (int i = NO_DATA_AVAILABLE; i < readInt; i++) {
            String readLongString = readLongString(dataInputStream);
            String readLongString2 = readLongString(dataInputStream);
            logger.trace("Received attribute '{}' = '{}'", readLongString, readLongString2);
            hashMap.put(readLongString, readLongString2);
        }
        return hashMap;
    }

    private String readLongString(DataInputStream dataInputStream) throws IOException {
        byte[] bArr = new byte[dataInputStream.readInt()];
        StreamUtils.fillBuffer(dataInputStream, bArr);
        return new String(bArr, StandardCharsets.UTF_8);
    }

    private ContentClaimTriple consumeContent(DataInputStream dataInputStream, OutputStream outputStream, ContentClaim contentClaim, long j, String str, boolean z) throws IOException {
        logger.debug("Consuming content from Peer {}", str);
        int read = dataInputStream.read();
        if (read < 0) {
            throw new EOFException("Encountered End-of-File when expecting to read Data Frame Indicator from Peer " + str);
        }
        if (read == 64) {
            logger.debug("Peer {} indicates that there is no Data Frame for the FlowFile", str);
            return new ContentClaimTriple(null, 0L, 0L);
        }
        if (read == 36) {
            throw new TransactionAbortedException("Peer " + str + " requested that transaction be aborted");
        }
        if (read != 66) {
            throw new IOException("Expected a Data Frame Indicator from Peer " + str + " but received a value of " + read);
        }
        int readInt = dataInputStream.readInt();
        logger.trace("Received Data Frame Length of {} for {}", Integer.valueOf(readInt), str);
        byte[] dataBuffer = getDataBuffer();
        long j2 = 0;
        while (true) {
            GZIPInputStream byteCountingInputStream = new ByteCountingInputStream(new LimitedInputStream(dataInputStream, readInt));
            int fillBuffer = StreamUtils.fillBuffer(z ? new GZIPInputStream(byteCountingInputStream) : byteCountingInputStream, dataBuffer, false);
            if (byteCountingInputStream.getBytesRead() < readInt) {
                throw new EOFException("Expected to receive a Data Frame of length " + readInt + " bytes but received only " + byteCountingInputStream.getBytesRead() + " bytes");
            }
            outputStream.write(dataBuffer, NO_DATA_AVAILABLE, fillBuffer);
            j2 += fillBuffer;
            int read2 = dataInputStream.read();
            if (read2 < 0) {
                throw new EOFException("Encountered End-of-File when expecting to receive a Data Frame Indicator");
            }
            if (read2 == 64) {
                logger.debug("Peer {} indicated that no more data frames are available", str);
                return new ContentClaimTriple(contentClaim, j, j2);
            }
            if (read2 == 36) {
                logger.debug("Peer {} requested that transaction be aborted by sending Data Frame Length of {}", str, Integer.valueOf(readInt));
                throw new TransactionAbortedException("Peer " + str + " requested that transaction be aborted");
            }
            if (read2 != 66) {
                throw new IOException("Expected a Data Frame Indicator from Peer " + str + " but received a value of " + read2);
            }
            readInt = dataInputStream.readInt();
            logger.trace("Received Data Frame Length of {} for {}", Integer.valueOf(readInt), str);
        }
    }
}
