package org.apache.nifi.controller.queue;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.IncompleteSwapFileException;
import org.apache.nifi.controller.repository.SwapContents;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.concurrency.TimedLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/SwappablePriorityQueue.class */
public class SwappablePriorityQueue {
    private static final Logger logger = LoggerFactory.getLogger(SwappablePriorityQueue.class);
    private static final int SWAP_RECORD_POLL_SIZE = 10000;
    private static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 10000;
    private final int swapThreshold;
    private final FlowFileSwapManager swapManager;
    private final EventReporter eventReporter;
    private final FlowFileQueue flowFileQueue;
    private final DropFlowFileAction dropAction;
    private final String swapPartitionName;
    private final TimedLock readLock;
    private final TimedLock writeLock;
    private final List<FlowFilePrioritizer> priorities = new ArrayList();
    private final List<String> swapLocations = new ArrayList();
    private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0, 0, 0, 0, 0, 0));
    private boolean swapMode = false;
    private final Map<String, Long> minQueueDateInSwapLocation = new HashMap();
    private final Map<String, Long> totalQueueDateInSwapLocation = new HashMap();
    private PriorityQueue<FlowFileRecord> activeQueue = new PriorityQueue<>(20, new QueuePrioritizer(Collections.emptyList()));
    private ArrayList<FlowFileRecord> swapQueue = new ArrayList<>();

    public SwappablePriorityQueue(FlowFileSwapManager flowFileSwapManager, int i, EventReporter eventReporter, FlowFileQueue flowFileQueue, DropFlowFileAction dropFlowFileAction, String str) {
        this.swapManager = flowFileSwapManager;
        this.swapThreshold = i;
        this.eventReporter = eventReporter;
        this.flowFileQueue = flowFileQueue;
        this.dropAction = dropFlowFileAction;
        this.swapPartitionName = str;
        ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(true);
        this.readLock = new TimedLock(reentrantReadWriteLock.readLock(), flowFileQueue.getIdentifier() + " Read Lock", 100);
        this.writeLock = new TimedLock(reentrantReadWriteLock.writeLock(), flowFileQueue.getIdentifier() + " Write Lock", 100);
    }

    private String getQueueIdentifier() {
        return this.flowFileQueue.getIdentifier();
    }

    public List<FlowFilePrioritizer> getPriorities() {
        this.readLock.lock();
        try {
            return Collections.unmodifiableList(this.priorities);
        } finally {
            this.readLock.unlock("getPriorities");
        }
    }

    public void setPriorities(List<FlowFilePrioritizer> list) {
        this.writeLock.lock();
        try {
            this.priorities.clear();
            this.priorities.addAll(list);
            PriorityQueue<FlowFileRecord> priorityQueue = new PriorityQueue<>(Math.max(20, this.activeQueue.size()), new QueuePrioritizer(list));
            priorityQueue.addAll(this.activeQueue);
            this.activeQueue = priorityQueue;
        } finally {
            this.writeLock.unlock("setPriorities");
        }
    }

    public LocalQueuePartitionDiagnostics getQueueDiagnostics() {
        this.readLock.lock();
        try {
            boolean z = !this.activeQueue.isEmpty() && this.activeQueue.peek().isPenalized();
            StandardLocalQueuePartitionDiagnostics standardLocalQueuePartitionDiagnostics = new StandardLocalQueuePartitionDiagnostics(getFlowFileQueueSize(), z, z && this.activeQueue.stream().anyMatch((v0) -> {
                return v0.isPenalized();
            }));
            this.readLock.unlock("getQueueDiagnostics");
            return standardLocalQueuePartitionDiagnostics;
        } catch (Throwable th) {
            this.readLock.unlock("getQueueDiagnostics");
            throw th;
        }
    }

    public List<FlowFileRecord> getActiveFlowFiles() {
        this.readLock.lock();
        try {
            return new ArrayList(this.activeQueue);
        } finally {
            this.readLock.unlock("getActiveFlowFiles");
        }
    }

    public boolean isUnacknowledgedFlowFile() {
        return getFlowFileQueueSize().getUnacknowledgedCount() > 0;
    }

    private void writeSwapFilesIfNecessary() {
        long j;
        if (this.swapQueue.size() < 10000) {
            return;
        }
        migrateSwapToActive();
        if (this.swapQueue.size() < 10000) {
            return;
        }
        int size = this.swapQueue.size() / 10000;
        int size2 = this.swapQueue.size();
        long j2 = 0;
        Iterator<FlowFileRecord> it = this.swapQueue.iterator();
        while (it.hasNext()) {
            j2 += it.next().getSize();
        }
        PriorityQueue priorityQueue = new PriorityQueue(this.swapQueue.size(), new QueuePrioritizer(getPriorities()));
        priorityQueue.addAll(this.swapQueue);
        long j3 = 0;
        int i = 0;
        ArrayList arrayList = new ArrayList(size);
        for (int i2 = 0; i2 < size; i2++) {
            long j4 = 0;
            long j5 = 0;
            long j6 = Long.MAX_VALUE;
            ArrayList arrayList2 = new ArrayList(10000);
            for (int i3 = 0; i3 < 10000; i3++) {
                FlowFileRecord flowFileRecord = (FlowFileRecord) priorityQueue.poll();
                arrayList2.add(flowFileRecord);
                j4 += flowFileRecord.getSize();
                j5 += flowFileRecord.getLastQueueDate().longValue();
                j6 = j6 < flowFileRecord.getLastQueueDate().longValue() ? j6 : flowFileRecord.getLastQueueDate().longValue();
            }
            try {
                Collections.reverse(arrayList2);
                String swapOut = this.swapManager.swapOut(arrayList2, this.flowFileQueue, this.swapPartitionName);
                arrayList.add(swapOut);
                logger.debug("Successfully wrote out Swap File {} containing {} FlowFiles ({} bytes)", new Object[]{swapOut, Integer.valueOf(arrayList2.size()), Long.valueOf(j4)});
                j3 += j4;
                i += arrayList2.size();
                this.minQueueDateInSwapLocation.put(swapOut, Long.valueOf(j6));
                this.totalQueueDateInSwapLocation.put(swapOut, Long.valueOf(j5));
            } catch (IOException e) {
                priorityQueue.addAll(arrayList2);
                int flowFileCount = getFlowFileCount();
                logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk due to {}", new Object[]{getQueueIdentifier(), Integer.valueOf(flowFileCount), e.toString()});
                logger.error("", e);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getQueueIdentifier() + " has " + flowFileCount + " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. See logs for more information.");
                }
            }
        }
        this.swapQueue.clear();
        long j7 = 0;
        while (true) {
            j = j7;
            FlowFileRecord flowFileRecord2 = (FlowFileRecord) priorityQueue.poll();
            if (flowFileRecord2 == null) {
                break;
            }
            this.swapQueue.add(flowFileRecord2);
            j7 = j + flowFileRecord2.getSize();
        }
        Collections.reverse(this.swapQueue);
        boolean z = false;
        while (!z) {
            FlowFileQueueSize flowFileQueueSize = getFlowFileQueueSize();
            FlowFileQueueSize flowFileQueueSize2 = new FlowFileQueueSize(flowFileQueueSize.getActiveCount(), flowFileQueueSize.getActiveBytes(), flowFileQueueSize.getSwappedCount() + (this.swapQueue.size() - size2) + i, flowFileQueueSize.getSwappedBytes() + (j - j2) + j3, flowFileQueueSize.getSwapFileCount() + size, flowFileQueueSize.getUnacknowledgedCount(), flowFileQueueSize.getUnacknowledgedBytes());
            z = updateSize(flowFileQueueSize, flowFileQueueSize2);
            if (z) {
                logIfNegative(flowFileQueueSize, flowFileQueueSize2, "swap");
            }
        }
        this.swapLocations.addAll(arrayList);
        logger.debug("After writing swap files, setting new set of Swap Locations to {}", this.swapLocations);
    }

    private int getFlowFileCount() {
        FlowFileQueueSize flowFileQueueSize = getFlowFileQueueSize();
        return flowFileQueueSize.getActiveCount() + flowFileQueueSize.getSwappedCount() + flowFileQueueSize.getUnacknowledgedCount();
    }

    private void migrateSwapToActive() {
        FlowFileRecord flowFileRecord;
        if (this.activeQueue.isEmpty()) {
            if (!this.swapLocations.isEmpty()) {
                swapIn();
                return;
            }
            FlowFileQueueSize flowFileQueueSize = getFlowFileQueueSize();
            if (!(flowFileQueueSize.getSwappedCount() == 0 && this.swapQueue.isEmpty()) && flowFileQueueSize.getSwappedCount() <= this.swapQueue.size()) {
                PriorityQueue priorityQueue = new PriorityQueue(this.swapQueue.size(), new QueuePrioritizer(getPriorities()));
                priorityQueue.addAll(this.swapQueue);
                int i = 0;
                long j = 0;
                while (this.activeQueue.size() < this.swapThreshold && (flowFileRecord = (FlowFileRecord) priorityQueue.poll()) != null) {
                    this.activeQueue.add(flowFileRecord);
                    j += flowFileRecord.getSize();
                    i++;
                }
                this.swapQueue.clear();
                while (true) {
                    FlowFileRecord flowFileRecord2 = (FlowFileRecord) priorityQueue.poll();
                    if (flowFileRecord2 == null) {
                        break;
                    } else {
                        this.swapQueue.add(flowFileRecord2);
                    }
                }
                if (i > 0) {
                    incrementActiveQueueSize(i, j);
                    incrementSwapQueueSize(-i, -j, 0);
                    logger.debug("Migrated {} FlowFiles from swap queue to active queue for {}", Integer.valueOf(i), this);
                }
                if (flowFileQueueSize.getSwappedCount() == 0) {
                    this.swapMode = false;
                }
            }
        }
    }

    private void swapIn() {
        SwapContents partialContents;
        String str = this.swapLocations.get(0);
        boolean z = false;
        try {
            logger.debug("Attempting to swap in {}; all swap locations = {}", str, this.swapLocations);
            partialContents = this.swapManager.swapIn(str, this.flowFileQueue);
            this.swapLocations.remove(0);
            this.minQueueDateInSwapLocation.remove(str);
            this.totalQueueDateInSwapLocation.remove(str);
        } catch (IncompleteSwapFileException e) {
            logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", str);
            logger.error("", e);
            partialContents = e.getPartialContents();
            z = true;
            this.swapLocations.remove(0);
            this.minQueueDateInSwapLocation.remove(str);
            this.totalQueueDateInSwapLocation.remove(str);
        } catch (FileNotFoundException e2) {
            logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", str);
            if (this.eventReporter != null) {
                this.eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + str + " because the Swap File can no longer be found");
            }
            this.swapLocations.remove(0);
            this.minQueueDateInSwapLocation.remove(str);
            this.totalQueueDateInSwapLocation.remove(str);
            return;
        } catch (IOException e3) {
            logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", str);
            logger.error("", e3);
            if (this.eventReporter != null) {
                this.eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + str + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information.");
                return;
            }
            return;
        } catch (Throwable th) {
            logger.error("Failed to swap in FlowFiles from Swap File {}", str, th);
            throw th;
        }
        QueueSize queueSize = partialContents.getSummary().getQueueSize();
        long byteCount = queueSize.getByteCount();
        int objectCount = queueSize.getObjectCount();
        incrementSwapQueueSize(-objectCount, -byteCount, -1);
        if (z) {
            long j = 0;
            Iterator it = partialContents.getFlowFiles().iterator();
            while (it.hasNext()) {
                j += ((FlowFileRecord) it.next()).getSize();
            }
            incrementActiveQueueSize(partialContents.getFlowFiles().size(), j);
            logger.debug("Swapped in partial contents containing {} FlowFiles ({} bytes) from {}", new Object[]{Integer.valueOf(partialContents.getFlowFiles().size()), Long.valueOf(j), str});
        } else {
            incrementActiveQueueSize(objectCount, byteCount);
            logger.debug("Successfully swapped in Swap File {} containing {} FlowFiles ({} bytes)", new Object[]{str, Integer.valueOf(objectCount), Long.valueOf(byteCount)});
        }
        this.activeQueue.addAll(partialContents.getFlowFiles());
    }

    public QueueSize size() {
        return getFlowFileQueueSize().toQueueSize();
    }

    public boolean isEmpty() {
        return getFlowFileQueueSize().isEmpty();
    }

    public boolean isActiveQueueEmpty() {
        FlowFileQueueSize flowFileQueueSize = getFlowFileQueueSize();
        return flowFileQueueSize.getActiveCount() == 0 && flowFileQueueSize.getSwappedCount() == 0;
    }

    public FlowFileAvailability getFlowFileAvailability() {
        FlowFileQueueSize flowFileQueueSize = getFlowFileQueueSize();
        if (flowFileQueueSize.getActiveCount() == 0 && flowFileQueueSize.getSwappedCount() == 0) {
            return FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
        }
        boolean z = false;
        this.readLock.lock();
        try {
            FlowFileRecord peek = this.activeQueue.peek();
            if (peek == null) {
                if (this.swapQueue.isEmpty() && flowFileQueueSize.getSwapFileCount() > 0) {
                    z = true;
                } else {
                    if (this.swapQueue.isEmpty()) {
                        FlowFileAvailability flowFileAvailability = FlowFileAvailability.ACTIVE_QUEUE_EMPTY;
                        this.readLock.unlock("isFlowFileAvailable");
                        return flowFileAvailability;
                    }
                    peek = this.swapQueue.get(0);
                }
            }
            if (z && this.writeLock.tryLock()) {
                try {
                    migrateSwapToActive();
                    this.writeLock.unlock("getFlowFileAvailability");
                } catch (Throwable th) {
                    this.writeLock.unlock("getFlowFileAvailability");
                    throw th;
                }
            }
            return peek == null ? FlowFileAvailability.ACTIVE_QUEUE_EMPTY : peek.isPenalized() ? FlowFileAvailability.HEAD_OF_QUEUE_PENALIZED : FlowFileAvailability.FLOWFILE_AVAILABLE;
        } finally {
            this.readLock.unlock("isFlowFileAvailable");
        }
    }

    public void acknowledge(FlowFileRecord flowFileRecord) {
        logger.trace("{} Acknowledging {}", this, flowFileRecord);
        directlyIncrementUnacknowledgedQueueSize(-1, -flowFileRecord.getSize());
    }

    public void acknowledge(Collection<FlowFileRecord> collection) {
        if (logger.isTraceEnabled()) {
            Iterator<FlowFileRecord> it = collection.iterator();
            while (it.hasNext()) {
                logger.trace("{} Acknowledging {}", this, it.next());
            }
        }
        directlyIncrementUnacknowledgedQueueSize(-collection.size(), -collection.stream().mapToLong((v0) -> {
            return v0.getSize();
        }).sum());
    }

    public void put(FlowFileRecord flowFileRecord) {
        this.writeLock.lock();
        try {
            if (this.swapMode || this.activeQueue.size() >= this.swapThreshold) {
                this.swapQueue.add(flowFileRecord);
                incrementSwapQueueSize(1, flowFileRecord.getSize(), 0);
                this.swapMode = true;
                writeSwapFilesIfNecessary();
            } else {
                incrementActiveQueueSize(1, flowFileRecord.getSize());
                this.activeQueue.add(flowFileRecord);
            }
            logger.trace("{} put to {}", flowFileRecord, this);
        } finally {
            this.writeLock.unlock("put(FlowFileRecord)");
        }
    }

    public void putAll(Collection<FlowFileRecord> collection) {
        int size = collection.size();
        long j = 0;
        Iterator<FlowFileRecord> it = collection.iterator();
        while (it.hasNext()) {
            j += it.next().getSize();
        }
        this.writeLock.lock();
        try {
            if (this.swapMode || this.activeQueue.size() >= this.swapThreshold - size) {
                this.swapQueue.addAll(collection);
                incrementSwapQueueSize(size, j, 0);
                this.swapMode = true;
                writeSwapFilesIfNecessary();
            } else {
                incrementActiveQueueSize(size, j);
                this.activeQueue.addAll(collection);
            }
            logger.trace("{} put to {}", collection, this);
            this.writeLock.unlock("putAll");
        } catch (Throwable th) {
            this.writeLock.unlock("putAll");
            throw th;
        }
    }

    public FlowFileRecord poll(Set<FlowFileRecord> set, long j) {
        return poll(set, j, PollStrategy.UNPENALIZED_FLOWFILES);
    }

    public FlowFileRecord poll(Set<FlowFileRecord> set, long j, PollStrategy pollStrategy) {
        this.writeLock.lock();
        try {
            FlowFileRecord doPoll = doPoll(set, j, pollStrategy);
            if (doPoll != null) {
                logger.trace("{} poll() returning {}", this, doPoll);
                unacknowledge(1, doPoll.getSize());
            }
            return doPoll;
        } finally {
            this.writeLock.unlock("poll(Set)");
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:15:? A[LOOP:0: B:2:0x0007->B:15:?, LOOP_END, SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0074 A[EDGE_INSN: B:8:0x0074->B:9:0x0074 BREAK  A[LOOP:0: B:2:0x0007->B:15:?], SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.nifi.controller.repository.FlowFileRecord doPoll(java.util.Set<org.apache.nifi.controller.repository.FlowFileRecord> r6, long r7, org.apache.nifi.controller.queue.PollStrategy r9) {
        /*
            r5 = this;
            r0 = r5
            r0.migrateSwapToActive()
            r0 = 0
            r12 = r0
        L7:
            r0 = r5
            java.util.PriorityQueue<org.apache.nifi.controller.repository.FlowFileRecord> r0 = r0.activeQueue
            java.lang.Object r0 = r0.poll()
            org.apache.nifi.controller.repository.FlowFileRecord r0 = (org.apache.nifi.controller.repository.FlowFileRecord) r0
            r10 = r0
            r0 = r5
            r1 = r10
            r2 = r7
            boolean r0 = r0.isExpired(r1, r2)
            r11 = r0
            r0 = r11
            if (r0 == 0) goto L48
            r0 = r6
            r1 = r10
            boolean r0 = r0.add(r1)
            r0 = r12
            r1 = r10
            long r1 = r1.getSize()
            long r0 = r0 + r1
            r12 = r0
            r0 = 0
            r10 = r0
            r0 = r6
            int r0 = r0.size()
            r1 = 10000(0x2710, float:1.4013E-41)
            if (r0 < r1) goto L6f
            goto L74
        L48:
            r0 = r10
            if (r0 == 0) goto L6f
            r0 = r10
            boolean r0 = r0.isPenalized()
            if (r0 == 0) goto L6f
            r0 = r9
            org.apache.nifi.controller.queue.PollStrategy r1 = org.apache.nifi.controller.queue.PollStrategy.UNPENALIZED_FLOWFILES
            if (r0 != r1) goto L6f
            r0 = r5
            java.util.PriorityQueue<org.apache.nifi.controller.repository.FlowFileRecord> r0 = r0.activeQueue
            r1 = r10
            boolean r0 = r0.add(r1)
            r0 = 0
            r10 = r0
            goto L74
        L6f:
            r0 = r11
            if (r0 != 0) goto L7
        L74:
            r0 = r6
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L8b
            r0 = r5
            r1 = r6
            int r1 = r1.size()
            int r1 = -r1
            r2 = r12
            long r2 = -r2
            r0.incrementActiveQueueSize(r1, r2)
        L8b:
            r0 = r10
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.nifi.controller.queue.SwappablePriorityQueue.doPoll(java.util.Set, long, org.apache.nifi.controller.queue.PollStrategy):org.apache.nifi.controller.repository.FlowFileRecord");
    }

    public List<FlowFileRecord> poll(int i, Set<FlowFileRecord> set, long j) {
        return poll(i, set, j, PollStrategy.UNPENALIZED_FLOWFILES);
    }

    public List<FlowFileRecord> poll(int i, Set<FlowFileRecord> set, long j, PollStrategy pollStrategy) {
        ArrayList arrayList = new ArrayList(Math.min(1, i));
        this.writeLock.lock();
        try {
            doPoll(arrayList, i, set, j, pollStrategy);
            this.writeLock.unlock("poll(int, Set)");
            if (!arrayList.isEmpty() && logger.isTraceEnabled()) {
                Iterator<FlowFileRecord> it = arrayList.iterator();
                while (it.hasNext()) {
                    logger.trace("{} poll() returning {}", this, it.next());
                }
            }
            return arrayList;
        } catch (Throwable th) {
            this.writeLock.unlock("poll(int, Set)");
            throw th;
        }
    }

    public List<FlowFileRecord> poll(FlowFileFilter flowFileFilter, Set<FlowFileRecord> set, long j) {
        return poll(flowFileFilter, set, j, PollStrategy.UNPENALIZED_FLOWFILES);
    }

    public List<FlowFileRecord> poll(FlowFileFilter flowFileFilter, Set<FlowFileRecord> set, long j, PollStrategy pollStrategy) {
        ArrayList arrayList;
        ArrayList arrayList2;
        long j2 = 0;
        int i = 0;
        long j3 = 0;
        int i2 = 0;
        this.writeLock.lock();
        try {
            migrateSwapToActive();
            arrayList = new ArrayList();
            arrayList2 = new ArrayList();
        } finally {
            this.writeLock.unlock("poll(Filter, Set)");
        }
        while (true) {
            FlowFileRecord poll = this.activeQueue.poll();
            if (poll == null) {
                break;
            }
            if (!isExpired(poll, j)) {
                if (poll.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                    this.activeQueue.add(poll);
                    break;
                }
                FlowFileFilter.FlowFileFilterResult filter = flowFileFilter.filter(poll);
                if (filter.isAccept()) {
                    j2 += poll.getSize();
                    i++;
                    arrayList.add(poll);
                } else {
                    arrayList2.add(poll);
                }
                if (!filter.isContinue()) {
                    break;
                }
            } else {
                set.add(poll);
                j3 += poll.getSize();
                i2++;
                if (set.size() >= 10000) {
                    break;
                }
            }
            this.writeLock.unlock("poll(Filter, Set)");
        }
        this.activeQueue.addAll(arrayList2);
        unacknowledge(i, j2);
        if (i2 > 0) {
            incrementActiveQueueSize(-i2, -j3);
        }
        if (!arrayList.isEmpty() && logger.isTraceEnabled()) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                logger.trace("{} poll() returning {}", this, (FlowFileRecord) it.next());
            }
        }
        return arrayList;
    }

    private void doPoll(List<FlowFileRecord> list, int i, Set<FlowFileRecord> set, long j, PollStrategy pollStrategy) {
        migrateSwapToActive();
        long drainQueue = drainQueue(this.activeQueue, list, i, set, j, pollStrategy);
        long j2 = 0;
        Iterator<FlowFileRecord> it = set.iterator();
        while (it.hasNext()) {
            j2 += it.next().getSize();
        }
        unacknowledge(list.size(), drainQueue);
        if (set.isEmpty()) {
            return;
        }
        incrementActiveQueueSize(-set.size(), -j2);
    }

    protected boolean isExpired(FlowFile flowFile, long j) {
        return isLaterThan(getExpirationDate(flowFile, j));
    }

    private boolean isLaterThan(Long l) {
        return l != null && l.longValue() < System.currentTimeMillis();
    }

    private Long getExpirationDate(FlowFile flowFile, long j) {
        if (flowFile != null && j > 0) {
            return Long.valueOf(flowFile.getEntryDate() + j);
        }
        return null;
    }

    private long drainQueue(Queue<FlowFileRecord> queue, List<FlowFileRecord> list, int i, Set<FlowFileRecord> set, long j, PollStrategy pollStrategy) {
        long j2;
        FlowFileRecord poll;
        long j3 = 0;
        while (true) {
            j2 = j3;
            if (list.size() >= i || (poll = queue.poll()) == null) {
                break;
            }
            if (!isExpired(poll, j)) {
                if (poll.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) {
                    queue.add(poll);
                    break;
                }
                list.add(poll);
                j3 = j2 + poll.getSize();
            } else {
                set.add(poll);
                if (set.size() >= 10000) {
                    break;
                }
                j3 = j2 + poll.getSize();
            }
        }
        return j2;
    }

    public FlowFileRecord getFlowFile(String str) {
        if (str == null) {
            return null;
        }
        this.readLock.lock();
        try {
            Iterator<FlowFileRecord> it = this.activeQueue.iterator();
            while (it.hasNext()) {
                FlowFileRecord next = it.next();
                if (str.equals(next.getAttribute(CoreAttributes.UUID.key()))) {
                    return next;
                }
            }
            this.readLock.unlock("getFlowFile");
            return null;
        } finally {
            this.readLock.unlock("getFlowFile");
        }
    }

    public void dropFlowFiles(DropFlowFileRequest dropFlowFileRequest, String str) {
        ArrayList arrayList;
        String requestIdentifier = dropFlowFileRequest.getRequestIdentifier();
        this.writeLock.lock();
        try {
            dropFlowFileRequest.setState(DropFlowFileState.DROPPING_FLOWFILES);
            logger.debug("For DropFlowFileRequest {}, original size is {}", requestIdentifier, size());
            try {
                arrayList = new ArrayList(this.activeQueue);
            } catch (Exception e) {
                logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", getQueueIdentifier(), e.toString());
                logger.error("", e);
                dropFlowFileRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString());
            }
            try {
                if (dropFlowFileRequest.getState() == DropFlowFileState.CANCELED) {
                    logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
                    this.writeLock.unlock("Drop FlowFiles");
                    return;
                }
                QueueSize drop = this.dropAction.drop(arrayList, str);
                logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, drop);
                this.activeQueue.clear();
                incrementActiveQueueSize(-drop.getObjectCount(), -drop.getByteCount());
                dropFlowFileRequest.setCurrentSize(size());
                dropFlowFileRequest.setDroppedSize(dropFlowFileRequest.getDroppedSize().add(drop));
                QueueSize swapQueueSize = getFlowFileQueueSize().swapQueueSize();
                logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", new Object[]{requestIdentifier, Integer.valueOf(this.swapQueue.size()), Integer.valueOf(swapQueueSize.getObjectCount()), Long.valueOf(swapQueueSize.getByteCount())});
                if (dropFlowFileRequest.getState() == DropFlowFileState.CANCELED) {
                    logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
                    this.writeLock.unlock("Drop FlowFiles");
                    return;
                }
                try {
                    QueueSize drop2 = this.dropAction.drop(this.swapQueue, str);
                    this.swapQueue.clear();
                    dropFlowFileRequest.setCurrentSize(size());
                    dropFlowFileRequest.setDroppedSize(dropFlowFileRequest.getDroppedSize().add(drop2));
                    this.swapMode = false;
                    incrementSwapQueueSize(-drop2.getObjectCount(), -drop2.getByteCount(), 0);
                    logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, drop2);
                    int size = this.swapLocations.size();
                    Iterator<String> it = this.swapLocations.iterator();
                    while (it.hasNext()) {
                        String next = it.next();
                        SwapContents swapContents = null;
                        try {
                        } catch (IOException e2) {
                            logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}", new Object[]{next, getQueueIdentifier(), e2.toString()});
                            logger.error("", e2);
                            if (this.eventReporter != null) {
                                this.eventReporter.reportEvent(Severity.ERROR, "Drop FlowFiles", "Failed to swap in FlowFiles from Swap File " + next + ". The FlowFiles contained in this Swap File will not be dropped from the queue");
                            }
                            dropFlowFileRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + next + " due to " + e2.toString());
                            if (0 != 0) {
                                this.activeQueue.addAll(swapContents.getFlowFiles());
                            }
                            this.writeLock.unlock("Drop FlowFiles");
                            return;
                        } catch (IncompleteSwapFileException e3) {
                            e3.getPartialContents();
                            String str2 = "Failed to swap in FlowFiles from Swap File " + next + " because the file was corrupt. Some FlowFiles may not be dropped from the queue until NiFi is restarted.";
                            logger.warn(str2);
                            if (this.eventReporter != null) {
                                this.eventReporter.reportEvent(Severity.WARNING, "Drop FlowFiles", str2);
                            }
                        }
                        if (dropFlowFileRequest.getState() == DropFlowFileState.CANCELED) {
                            logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier);
                            return;
                        }
                        drop2 = this.dropAction.drop(this.swapManager.swapIn(next, this.flowFileQueue).getFlowFiles(), str);
                        dropFlowFileRequest.setDroppedSize(dropFlowFileRequest.getDroppedSize().add(drop2));
                        incrementSwapQueueSize(-drop2.getObjectCount(), -drop2.getByteCount(), -1);
                        dropFlowFileRequest.setCurrentSize(size());
                        it.remove();
                        this.minQueueDateInSwapLocation.remove(next);
                        this.totalQueueDateInSwapLocation.remove(next);
                        logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", new Object[]{requestIdentifier, drop2, next});
                    }
                    logger.debug("Dropped FlowFiles from {} Swap Files", Integer.valueOf(size));
                    logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}", new Object[]{Integer.valueOf(dropFlowFileRequest.getDroppedSize().getObjectCount()), Long.valueOf(dropFlowFileRequest.getDroppedSize().getByteCount()), getQueueIdentifier(), str});
                    dropFlowFileRequest.setState(DropFlowFileState.COMPLETE);
                    this.writeLock.unlock("Drop FlowFiles");
                } catch (IOException e4) {
                    logger.error("Failed to drop the FlowFiles from queue {} due to {}", getQueueIdentifier(), e4.toString());
                    logger.error("", e4);
                    dropFlowFileRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e4.toString());
                    this.writeLock.unlock("Drop FlowFiles");
                }
            } catch (IOException e5) {
                logger.error("Failed to drop the FlowFiles from queue {} due to {}", getQueueIdentifier(), e5.toString());
                logger.error("", e5);
                dropFlowFileRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e5.toString());
                this.writeLock.unlock("Drop FlowFiles");
            }
        } finally {
            this.writeLock.unlock("Drop FlowFiles");
        }
    }

    public SwapSummary recoverSwappedFlowFiles() {
        int i = 0;
        long j = 0;
        long j2 = 0;
        Long l = null;
        Long l2 = null;
        ArrayList arrayList = new ArrayList();
        long nanoTime = System.nanoTime();
        int i2 = 0;
        this.writeLock.lock();
        try {
            try {
                LinkedHashSet<String> linkedHashSet = new LinkedHashSet(this.swapManager.recoverSwapLocations(this.flowFileQueue, this.swapPartitionName));
                linkedHashSet.removeAll(this.swapLocations);
                logger.debug("Swap Manager reports {} Swap Files for {}: {}", new Object[]{Integer.valueOf(linkedHashSet.size()), this.flowFileQueue, linkedHashSet});
                for (String str : linkedHashSet) {
                    try {
                        SwapSummary swapSummary = this.swapManager.getSwapSummary(str);
                        QueueSize queueSize = swapSummary.getQueueSize();
                        Long maxFlowFileId = swapSummary.getMaxFlowFileId();
                        if (maxFlowFileId != null && (l2 == null || maxFlowFileId.longValue() > l2.longValue())) {
                            l2 = maxFlowFileId;
                        }
                        i += queueSize.getObjectCount();
                        j += queueSize.getByteCount();
                        arrayList.addAll(swapSummary.getResourceClaims());
                        this.minQueueDateInSwapLocation.put(str, swapSummary.getMinLastQueueDate());
                        this.totalQueueDateInSwapLocation.put(str, swapSummary.getTotalLastQueueDate());
                        if (l == null) {
                            l = swapSummary.getMinLastQueueDate();
                        } else if (swapSummary.getMinLastQueueDate() != null) {
                            l = Long.valueOf(Long.min(l.longValue(), swapSummary.getMinLastQueueDate().longValue()));
                        }
                        j2 += swapSummary.getTotalLastQueueDate().longValue();
                    } catch (IOException e) {
                        i2++;
                        logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", str);
                        logger.error("", e);
                        if (this.eventReporter != null) {
                            this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + str + "; the file appears to be corrupt. See logs for more details");
                        }
                    }
                }
                incrementSwapQueueSize(i, j, linkedHashSet.size());
                this.swapLocations.addAll(linkedHashSet);
                this.writeLock.unlock("Recover Swap Files");
                if (this.swapLocations.isEmpty()) {
                    logger.debug("No swap files were recovered for {}", this.flowFileQueue);
                } else {
                    logger.info("Recovered {} swap files for {} in {} millis", new Object[]{Integer.valueOf(this.swapLocations.size() - i2), this, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime))});
                }
                return new StandardSwapSummary(new QueueSize(i, j), l2, arrayList, l, Long.valueOf(j2));
            } catch (IOException e2) {
                logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getQueueIdentifier());
                logger.error("", e2);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + getQueueIdentifier() + "; see logs for more detials");
                }
                this.writeLock.unlock("Recover Swap Files");
                return null;
            }
        } catch (Throwable th) {
            this.writeLock.unlock("Recover Swap Files");
            throw th;
        }
    }

    public long getMinLastQueueDate() {
        this.readLock.lock();
        try {
            long minLastQueueDate = getMinLastQueueDate(this.activeQueue, 0L);
            long min = Long.min(minLastQueueDate, getMinLastQueueDate(this.swapQueue, minLastQueueDate));
            for (Long l : this.minQueueDateInSwapLocation.values()) {
                min = min == 0 ? l.longValue() : Long.min(min, l.longValue());
            }
            return min;
        } finally {
            this.readLock.unlock("Get Min Last Queue Date");
        }
    }

    private long getMinLastQueueDate(Iterable<FlowFileRecord> iterable, long j) {
        long j2 = 0;
        for (FlowFileRecord flowFileRecord : iterable) {
            j2 = j2 == 0 ? flowFileRecord.getLastQueueDate().longValue() : Long.min(flowFileRecord.getLastQueueDate().longValue(), j2);
        }
        return j2 == 0 ? j : j2;
    }

    public long getTotalQueuedDuration(long j) {
        this.readLock.lock();
        try {
            long j2 = 0;
            Iterator<FlowFileRecord> it = this.activeQueue.iterator();
            while (it.hasNext()) {
                j2 += j - it.next().getLastQueueDate().longValue();
            }
            Iterator<FlowFileRecord> it2 = this.swapQueue.iterator();
            while (it2.hasNext()) {
                j2 += j - it2.next().getLastQueueDate().longValue();
            }
            long j3 = 0;
            Iterator<Long> it3 = this.totalQueueDateInSwapLocation.values().iterator();
            while (it3.hasNext()) {
                j3 += it3.next().longValue();
            }
            long swappedCount = j2 + (((getFlowFileQueueSize().getSwappedCount() - this.swapQueue.size()) * j) - j3);
            this.readLock.unlock("Get Total Queued Duration");
            return swappedCount;
        } catch (Throwable th) {
            this.readLock.unlock("Get Total Queued Duration");
            throw th;
        }
    }

    protected void incrementActiveQueueSize(int i, long j) {
        boolean z = false;
        while (!z) {
            FlowFileQueueSize flowFileQueueSize = this.size.get();
            FlowFileQueueSize flowFileQueueSize2 = new FlowFileQueueSize(flowFileQueueSize.getActiveCount() + i, flowFileQueueSize.getActiveBytes() + j, flowFileQueueSize.getSwappedCount(), flowFileQueueSize.getSwappedBytes(), flowFileQueueSize.getSwapFileCount(), flowFileQueueSize.getUnacknowledgedCount(), flowFileQueueSize.getUnacknowledgedBytes());
            z = updateSize(flowFileQueueSize, flowFileQueueSize2);
            if (z) {
                logIfNegative(flowFileQueueSize, flowFileQueueSize2, "active");
            }
        }
    }

    private void incrementSwapQueueSize(int i, long j, int i2) {
        boolean z = false;
        while (!z) {
            FlowFileQueueSize flowFileQueueSize = getFlowFileQueueSize();
            FlowFileQueueSize flowFileQueueSize2 = new FlowFileQueueSize(flowFileQueueSize.getActiveCount(), flowFileQueueSize.getActiveBytes(), flowFileQueueSize.getSwappedCount() + i, flowFileQueueSize.getSwappedBytes() + j, flowFileQueueSize.getSwapFileCount() + i2, flowFileQueueSize.getUnacknowledgedCount(), flowFileQueueSize.getUnacknowledgedBytes());
            z = updateSize(flowFileQueueSize, flowFileQueueSize2);
            if (z) {
                logIfNegative(flowFileQueueSize, flowFileQueueSize2, "swap");
            }
        }
    }

    private void unacknowledge(int i, long j) {
        directlyIncrementUnacknowledgedQueueSize(i, j);
        incrementActiveQueueSize(-i, -j);
    }

    private void directlyIncrementUnacknowledgedQueueSize(int i, long j) {
        boolean z = false;
        while (!z) {
            FlowFileQueueSize flowFileQueueSize = this.size.get();
            FlowFileQueueSize flowFileQueueSize2 = new FlowFileQueueSize(flowFileQueueSize.getActiveCount(), flowFileQueueSize.getActiveBytes(), flowFileQueueSize.getSwappedCount(), flowFileQueueSize.getSwappedBytes(), flowFileQueueSize.getSwapFileCount(), flowFileQueueSize.getUnacknowledgedCount() + i, flowFileQueueSize.getUnacknowledgedBytes() + j);
            z = updateSize(flowFileQueueSize, flowFileQueueSize2);
            if (z) {
                logIfNegative(flowFileQueueSize, flowFileQueueSize2, "Unacknowledged");
            }
        }
    }

    private void logIfNegative(FlowFileQueueSize flowFileQueueSize, FlowFileQueueSize flowFileQueueSize2, String str) {
        if (flowFileQueueSize2.getActiveBytes() < 0 || flowFileQueueSize2.getActiveCount() < 0 || flowFileQueueSize2.getSwappedBytes() < 0 || flowFileQueueSize2.getSwappedCount() < 0 || flowFileQueueSize2.getUnacknowledgedBytes() < 0 || flowFileQueueSize2.getUnacknowledgedCount() < 0) {
            logger.error("Updated Size of Queue " + str + " from " + flowFileQueueSize + " to " + flowFileQueueSize2, new RuntimeException("Cannot create negative queue size"));
        }
    }

    protected boolean updateSize(FlowFileQueueSize flowFileQueueSize, FlowFileQueueSize flowFileQueueSize2) {
        return this.size.compareAndSet(flowFileQueueSize, flowFileQueueSize2);
    }

    public FlowFileQueueSize getFlowFileQueueSize() {
        return this.size.get();
    }

    public void inheritQueueContents(FlowFileQueueContents flowFileQueueContents) {
        this.writeLock.lock();
        try {
            putAll(flowFileQueueContents.getActiveFlowFiles());
            List<String> swapLocations = flowFileQueueContents.getSwapLocations();
            this.swapLocations.addAll(swapLocations);
            incrementSwapQueueSize(flowFileQueueContents.getSwapSize().getObjectCount(), flowFileQueueContents.getSwapSize().getByteCount(), flowFileQueueContents.getSwapLocations().size());
            if (!swapLocations.isEmpty()) {
                logger.debug("Inherited the following swap locations: {}", swapLocations);
            }
        } finally {
            this.writeLock.unlock("inheritQueueContents");
        }
    }

    public FlowFileQueueContents packageForRebalance(String str) {
        FlowFileQueueSize flowFileQueueSize;
        QueueSize queueSize;
        this.writeLock.lock();
        try {
            ArrayList arrayList = new ArrayList(this.activeQueue);
            ArrayList arrayList2 = new ArrayList(this.swapLocations.size());
            for (String str2 : this.swapLocations) {
                try {
                    arrayList2.add(this.swapManager.changePartitionName(str2, str));
                } catch (IOException e) {
                    logger.error("Failed to update Swap File {} to reflect that the contents are now owned by Partition '{}'", new Object[]{str2, str, e});
                }
            }
            this.swapLocations.clear();
            this.activeQueue.clear();
            int size = this.swapQueue.size();
            long sum = this.swapQueue.stream().mapToLong((v0) -> {
                return v0.getSize();
            }).sum();
            arrayList.addAll(this.swapQueue);
            this.swapQueue.clear();
            this.swapMode = false;
            do {
                flowFileQueueSize = getFlowFileQueueSize();
                queueSize = new QueueSize(flowFileQueueSize.getSwappedCount() - size, flowFileQueueSize.getSwappedBytes() - sum);
            } while (!updateSize(flowFileQueueSize, new FlowFileQueueSize(0, 0L, 0, 0L, 0, flowFileQueueSize.getUnacknowledgedCount(), flowFileQueueSize.getUnacknowledgedBytes())));
            logger.debug("Cleared {} to package FlowFile for rebalance to {}", this, str);
            FlowFileQueueContents flowFileQueueContents = new FlowFileQueueContents(arrayList, arrayList2, queueSize);
            this.writeLock.unlock("packageForRebalance(SwappablePriorityQueue)");
            return flowFileQueueContents;
        } catch (Throwable th) {
            this.writeLock.unlock("packageForRebalance(SwappablePriorityQueue)");
            throw th;
        }
    }

    public String toString() {
        return "SwappablePriorityQueue[queueId=" + this.flowFileQueue.getIdentifier() + ", partition=" + this.swapPartitionName + "]";
    }
}
