package org.apache.nifi.controller.queue.clustered.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.queue.clustered.TransferFailureDestination;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.class */
public class RemoteQueuePartition implements QueuePartition {
    private static final Logger logger = LoggerFactory.getLogger(RemoteQueuePartition.class);
    private final NodeIdentifier nodeIdentifier;
    private final SwappablePriorityQueue priorityQueue;
    private final LoadBalancedFlowFileQueue flowFileQueue;
    private final TransferFailureDestination failureDestination;
    private final FlowFileRepository flowFileRepo;
    private final ProvenanceEventRepository provRepo;
    private final ContentRepository contentRepo;
    private final AsyncLoadBalanceClientRegistry clientRegistry;
    private boolean running = false;
    private final String description;

    public RemoteQueuePartition(NodeIdentifier nodeIdentifier, SwappablePriorityQueue swappablePriorityQueue, TransferFailureDestination transferFailureDestination, FlowFileRepository flowFileRepository, ProvenanceEventRepository provenanceEventRepository, ContentRepository contentRepository, AsyncLoadBalanceClientRegistry asyncLoadBalanceClientRegistry, LoadBalancedFlowFileQueue loadBalancedFlowFileQueue) {
        this.nodeIdentifier = nodeIdentifier;
        this.priorityQueue = swappablePriorityQueue;
        this.flowFileQueue = loadBalancedFlowFileQueue;
        this.failureDestination = transferFailureDestination;
        this.flowFileRepo = flowFileRepository;
        this.provRepo = provenanceEventRepository;
        this.contentRepo = contentRepository;
        this.clientRegistry = asyncLoadBalanceClientRegistry;
        this.description = "RemoteQueuePartition[queueId=" + loadBalancedFlowFileQueue.getIdentifier() + ", nodeId=" + this.nodeIdentifier + "]";
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public QueueSize size() {
        return this.priorityQueue.size();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public long getTotalActiveQueuedDuration(long j) {
        return this.priorityQueue.getTotalQueuedDuration(j);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public long getMinLastQueueDate() {
        return this.priorityQueue.getMinLastQueueDate();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public String getSwapPartitionName() {
        return this.nodeIdentifier.getId();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public Optional<NodeIdentifier> getNodeIdentifier() {
        return Optional.ofNullable(this.nodeIdentifier);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void put(FlowFileRecord flowFileRecord) {
        this.priorityQueue.put(flowFileRecord);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void putAll(Collection<FlowFileRecord> collection) {
        this.priorityQueue.putAll(collection);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void dropFlowFiles(DropFlowFileRequest dropFlowFileRequest, String str) {
        this.priorityQueue.dropFlowFiles(dropFlowFileRequest, str);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public SwapSummary recoverSwappedFlowFiles() {
        return this.priorityQueue.recoverSwappedFlowFiles();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public FlowFileQueueContents packageForRebalance(String str) {
        return this.priorityQueue.packageForRebalance(str);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void setPriorities(List<FlowFilePrioritizer> list) {
        this.priorityQueue.setPriorities(list);
    }

    private FlowFileRecord getFlowFile() {
        HashSet hashSet = new HashSet();
        FlowFileRecord poll = this.priorityQueue.poll(hashSet, this.flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS), PollStrategy.ALL_FLOWFILES);
        this.flowFileQueue.handleExpiredRecords(hashSet);
        return poll;
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public synchronized void start(final FlowFilePartitioner flowFilePartitioner) {
        if (this.running) {
            return;
        }
        TransactionFailureCallback transactionFailureCallback = new TransactionFailureCallback() { // from class: org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition.1
            @Override // org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback
            public void onTransactionFailed(List<FlowFileRecord> list, Exception exc, TransactionFailureCallback.TransactionPhase transactionPhase) {
                RemoteQueuePartition.this.priorityQueue.acknowledge(list);
                if (exc instanceof ContentNotFoundException) {
                    Optional flowFile = ((ContentNotFoundException) exc).getFlowFile();
                    if (flowFile.isPresent()) {
                        ArrayList arrayList = new ArrayList(list);
                        FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile.get();
                        arrayList.remove(flowFileRecord);
                        StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(RemoteQueuePartition.this.flowFileQueue, flowFileRecord);
                        standardRepositoryRecord.markForAbort();
                        RemoteQueuePartition.this.updateRepositories(Collections.emptyList(), Collections.singleton(standardRepositoryRecord), null);
                        if (transactionPhase == TransactionFailureCallback.TransactionPhase.CONNECTING) {
                            TransferFailureDestination transferFailureDestination = RemoteQueuePartition.this.failureDestination;
                            SwappablePriorityQueue swappablePriorityQueue = RemoteQueuePartition.this.priorityQueue;
                            Objects.requireNonNull(swappablePriorityQueue);
                            transferFailureDestination.putAll(swappablePriorityQueue::packageForRebalance, flowFilePartitioner);
                        }
                        RemoteQueuePartition.this.failureDestination.putAll(arrayList, flowFilePartitioner);
                        RemoteQueuePartition.this.flowFileQueue.onTransfer(Collections.singleton(flowFileRecord));
                        return;
                    }
                }
                if (transactionPhase == TransactionFailureCallback.TransactionPhase.CONNECTING) {
                    TransferFailureDestination transferFailureDestination2 = RemoteQueuePartition.this.failureDestination;
                    SwappablePriorityQueue swappablePriorityQueue2 = RemoteQueuePartition.this.priorityQueue;
                    Objects.requireNonNull(swappablePriorityQueue2);
                    transferFailureDestination2.putAll(swappablePriorityQueue2::packageForRebalance, flowFilePartitioner);
                }
                RemoteQueuePartition.this.failureDestination.putAll(list, flowFilePartitioner);
            }

            @Override // org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback
            public boolean isRebalanceOnFailure() {
                return RemoteQueuePartition.this.failureDestination.isRebalanceOnFailure(flowFilePartitioner);
            }
        };
        TransactionCompleteCallback transactionCompleteCallback = new TransactionCompleteCallback() { // from class: org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition.2
            @Override // org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback
            public void onTransactionComplete(List<FlowFileRecord> list, NodeIdentifier nodeIdentifier) {
                RemoteQueuePartition.this.priorityQueue.acknowledge(list);
                RemoteQueuePartition.this.flowFileQueue.onTransfer(list);
                RemoteQueuePartition.this.updateRepositories(list, Collections.emptyList(), nodeIdentifier);
            }
        };
        SwappablePriorityQueue swappablePriorityQueue = this.priorityQueue;
        Objects.requireNonNull(swappablePriorityQueue);
        BooleanSupplier booleanSupplier = swappablePriorityQueue::isEmpty;
        AsyncLoadBalanceClientRegistry asyncLoadBalanceClientRegistry = this.clientRegistry;
        String identifier = this.flowFileQueue.getIdentifier();
        NodeIdentifier nodeIdentifier = this.nodeIdentifier;
        Supplier<FlowFileRecord> supplier = this::getFlowFile;
        LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = this.flowFileQueue;
        Objects.requireNonNull(loadBalancedFlowFileQueue);
        Supplier<LoadBalanceCompression> supplier2 = loadBalancedFlowFileQueue::getLoadBalanceCompression;
        LoadBalancedFlowFileQueue loadBalancedFlowFileQueue2 = this.flowFileQueue;
        Objects.requireNonNull(loadBalancedFlowFileQueue2);
        asyncLoadBalanceClientRegistry.register(identifier, nodeIdentifier, booleanSupplier, supplier, transactionFailureCallback, transactionCompleteCallback, supplier2, loadBalancedFlowFileQueue2::isPropagateBackpressureAcrossNodes);
        this.running = true;
    }

    public void onRemoved() {
        this.clientRegistry.unregister(this.flowFileQueue.getIdentifier(), this.nodeIdentifier);
    }

    private void updateRepositories(List<FlowFileRecord> list, Collection<RepositoryRecord> collection, NodeIdentifier nodeIdentifier) {
        ArrayList arrayList = new ArrayList((list.size() * 2) + collection.size());
        for (FlowFileRecord flowFileRecord : list) {
            arrayList.add(createSendEvent(flowFileRecord, nodeIdentifier));
            arrayList.add(createDropEvent(flowFileRecord));
        }
        Iterator<RepositoryRecord> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add(createDropEvent(it.next().getCurrent(), "Content Not Found"));
        }
        this.provRepo.registerEvents(arrayList);
        List list2 = (List) list.stream().map(this::createRepositoryRecord).collect(Collectors.toCollection(ArrayList::new));
        list2.addAll(collection);
        try {
            this.flowFileRepo.updateRepository(list2);
        } catch (Exception e) {
            logger.error("Unable to update FlowFile repository to indicate that {} FlowFiles have been transferred to {}. It is possible that these FlowFiles will be duplicated upon restart of NiFi.", new Object[]{Integer.valueOf(list.size()), getNodeIdentifier(), e});
        }
    }

    private RepositoryRecord createRepositoryRecord(FlowFileRecord flowFileRecord) {
        StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(this.flowFileQueue, flowFileRecord);
        standardRepositoryRecord.markForDelete();
        return standardRepositoryRecord;
    }

    private ProvenanceEventRecord createSendEvent(FlowFileRecord flowFileRecord, NodeIdentifier nodeIdentifier) {
        ProvenanceEventBuilder transitUri = new StandardProvenanceEventRecord.Builder().fromFlowFile(flowFileRecord).setEventType(ProvenanceEventType.SEND).setDetails("Re-distributed for Load-balanced connection").setComponentId(this.flowFileQueue.getIdentifier()).setComponentType("Connection").setSourceQueueIdentifier(this.flowFileQueue.getIdentifier()).setSourceSystemFlowFileIdentifier(flowFileRecord.getAttribute(CoreAttributes.UUID.key())).setTransitUri("nifi://" + nodeIdentifier.getApiAddress() + "/loadbalance/" + this.flowFileQueue.getIdentifier());
        ContentClaim contentClaim = flowFileRecord.getContentClaim();
        if (contentClaim != null) {
            ResourceClaim resourceClaim = contentClaim.getResourceClaim();
            transitUri.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
            transitUri.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
        }
        return transitUri.build();
    }

    private ProvenanceEventRecord createDropEvent(FlowFileRecord flowFileRecord) {
        return createDropEvent(flowFileRecord, null);
    }

    private ProvenanceEventRecord createDropEvent(FlowFileRecord flowFileRecord, String str) {
        ProvenanceEventBuilder sourceQueueIdentifier = new StandardProvenanceEventRecord.Builder().fromFlowFile(flowFileRecord).setEventType(ProvenanceEventType.DROP).setDetails(str).setComponentId(this.flowFileQueue.getIdentifier()).setComponentType("Connection").setSourceQueueIdentifier(this.flowFileQueue.getIdentifier());
        ContentClaim contentClaim = flowFileRecord.getContentClaim();
        if (contentClaim != null) {
            ResourceClaim resourceClaim = contentClaim.getResourceClaim();
            sourceQueueIdentifier.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
            sourceQueueIdentifier.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
        }
        return sourceQueueIdentifier.build();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public synchronized void stop() {
        this.running = false;
        this.clientRegistry.unregister(this.flowFileQueue.getIdentifier(), this.nodeIdentifier);
    }

    public RemoteQueuePartitionDiagnostics getDiagnostics() {
        return new StandardRemoteQueuePartitionDiagnostics(this.nodeIdentifier.toString(), this.priorityQueue.getFlowFileQueueSize());
    }

    public String toString() {
        return this.description;
    }
}
