package org.apache.nifi.controller.repository;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.ReconstitutedSerializedRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.repository.encryption.configuration.EncryptionProtocol;
import org.apache.nifi.repository.schema.FieldCache;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.apache.nifi.wali.SnapshotCapture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SyncListener;
import org.wali.WriteAheadRepository;

/* loaded from: input_file:org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.class */
public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncListener {
    static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
    private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
    private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles";
    private static final String FLOWFILE_REPO_CACHE_SIZE = "nifi.flowfile.repository.wal.cache.characters";
    static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
    static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
    private static final String MINIMAL_LOCKING_WALI = "org.wali.MinimalLockingWriteAheadLog";
    private static final String DEFAULT_WAL_IMPLEMENTATION = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
    private static final int DEFAULT_CACHE_SIZE = 10000000;
    private final String walImplementation;
    protected final NiFiProperties nifiProperties;
    private final AtomicLong flowFileSequenceGenerator;
    private final boolean alwaysSync;
    private final boolean retainOrphanedFlowFiles;
    private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class);
    volatile ScheduledFuture<?> checkpointFuture;
    private final long checkpointDelayMillis;
    private final List<File> flowFileRepositoryPaths;
    private final List<File> recoveryFiles;
    private final ScheduledExecutorService checkpointExecutor;
    private final int maxCharactersToCache;
    private volatile Collection<SerializedRepositoryRecord> recoveredRecords;
    private final Set<ResourceClaim> orphanedResourceClaims;
    private final Set<String> swapLocationSuffixes;
    private WriteAheadRepository<SerializedRepositoryRecord> wal;
    private RepositoryRecordSerdeFactory serdeFactory;
    private ResourceClaimManager claimManager;
    private FieldCache fieldCache;
    private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> claimsAwaitingDestruction;

    public WriteAheadFlowFileRepository() {
        this.flowFileSequenceGenerator = new AtomicLong(0L);
        this.flowFileRepositoryPaths = new ArrayList();
        this.recoveryFiles = new ArrayList();
        this.recoveredRecords = null;
        this.orphanedResourceClaims = Collections.synchronizedSet(new HashSet());
        this.swapLocationSuffixes = new HashSet();
        this.claimsAwaitingDestruction = new ConcurrentHashMap();
        this.alwaysSync = false;
        this.checkpointDelayMillis = 0L;
        this.checkpointExecutor = null;
        this.walImplementation = null;
        this.nifiProperties = null;
        this.retainOrphanedFlowFiles = true;
        this.maxCharactersToCache = 0;
    }

    public WriteAheadFlowFileRepository(NiFiProperties niFiProperties) {
        this.flowFileSequenceGenerator = new AtomicLong(0L);
        this.flowFileRepositoryPaths = new ArrayList();
        this.recoveryFiles = new ArrayList();
        this.recoveredRecords = null;
        this.orphanedResourceClaims = Collections.synchronizedSet(new HashSet());
        this.swapLocationSuffixes = new HashSet();
        this.claimsAwaitingDestruction = new ConcurrentHashMap();
        this.alwaysSync = Boolean.parseBoolean(niFiProperties.getProperty("nifi.flowfile.repository.always.sync", "false"));
        this.nifiProperties = niFiProperties;
        String property = niFiProperties.getProperty(RETAIN_ORPHANED_FLOWFILES);
        this.retainOrphanedFlowFiles = property == null || Boolean.parseBoolean(property);
        String property2 = niFiProperties.getProperty(WRITE_AHEAD_LOG_IMPL);
        this.walImplementation = property2 == null ? "org.apache.nifi.wali.SequentialAccessWriteAheadLog" : property2;
        this.maxCharactersToCache = niFiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, Integer.valueOf(DEFAULT_CACHE_SIZE)).intValue();
        for (String str : niFiProperties.getPropertyKeys()) {
            if (str.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) {
                this.recoveryFiles.add(new File(niFiProperties.getProperty(str)));
            }
        }
        if (isSequentialAccessWAL(this.walImplementation)) {
            this.flowFileRepositoryPaths.add(new File(niFiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)));
        } else {
            this.flowFileRepositoryPaths.addAll(this.recoveryFiles);
        }
        this.checkpointDelayMillis = FormatUtils.getTimeDuration(niFiProperties.getFlowFileRepositoryCheckpointInterval(), TimeUnit.MILLISECONDS);
        this.checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
    }

    private static boolean isSequentialAccessWAL(String str) {
        return str.equals("org.apache.nifi.wali.SequentialAccessWriteAheadLog") || str.equals(ENCRYPTED_SEQUENTIAL_ACCESS_WAL);
    }

    public void initialize(ResourceClaimManager resourceClaimManager) throws IOException {
        CaffeineFieldCache caffeineFieldCache = new CaffeineFieldCache(this.maxCharactersToCache);
        initialize(resourceClaimManager, createSerdeFactory(resourceClaimManager, caffeineFieldCache), caffeineFieldCache);
    }

    protected RepositoryRecordSerdeFactory createSerdeFactory(ResourceClaimManager resourceClaimManager, FieldCache fieldCache) {
        boolean equals = Integer.toString(EncryptionProtocol.VERSION_1.getVersionNumber()).equals(this.nifiProperties.getProperty("nifi.repository.encryption.protocol.version"));
        if (!EncryptedSequentialAccessWriteAheadLog.class.getName().equals(this.nifiProperties.getProperty(WRITE_AHEAD_LOG_IMPL)) && !equals) {
            return new StandardRepositoryRecordSerdeFactory(resourceClaimManager, fieldCache);
        }
        logger.info("Creating Encrypted FlowFile Repository [{}]", EncryptedSequentialAccessWriteAheadLog.class.getName());
        return new EncryptedRepositoryRecordSerdeFactory(resourceClaimManager, this.nifiProperties, fieldCache);
    }

    public void initialize(ResourceClaimManager resourceClaimManager, RepositoryRecordSerdeFactory repositoryRecordSerdeFactory, FieldCache fieldCache) throws IOException {
        this.claimManager = resourceClaimManager;
        this.fieldCache = fieldCache;
        Iterator<File> it = this.flowFileRepositoryPaths.iterator();
        while (it.hasNext()) {
            Files.createDirectories(it.next().toPath(), new FileAttribute[0]);
        }
        this.serdeFactory = repositoryRecordSerdeFactory;
        if (isSequentialAccessWAL(this.walImplementation)) {
            this.wal = new SequentialAccessWriteAheadLog(this.flowFileRepositoryPaths.get(0), repositoryRecordSerdeFactory, this);
        } else {
            if (!this.walImplementation.equals(MINIMAL_LOCKING_WALI)) {
                throw new IllegalStateException("Cannot create Write-Ahead Log because the configured property 'nifi.flowfile.repository.wal.implementation' has an invalid value of '" + this.walImplementation + "'. Please update nifi.properties to indicate a valid value for this property.");
            }
            this.wal = new MinimalLockingWriteAheadLog((SortedSet) this.flowFileRepositoryPaths.stream().map((v0) -> {
                return v0.toPath();
            }).collect(Collectors.toCollection(TreeSet::new)), 1, repositoryRecordSerdeFactory, this);
        }
        logger.info("Initialized FlowFile Repository");
    }

    public void close() throws IOException {
        if (this.checkpointFuture != null) {
            this.checkpointFuture.cancel(false);
        }
        this.checkpointExecutor.shutdown();
        this.wal.shutdown();
    }

    public boolean isVolatile() {
        return false;
    }

    public Map<ResourceClaim, Set<ResourceClaimReference>> findResourceClaimReferences(Set<ResourceClaim> set, FlowFileSwapManager flowFileSwapManager) {
        if (!isSequentialAccessWAL(this.walImplementation)) {
            return null;
        }
        HashMap hashMap = new HashMap();
        SnapshotCapture captureSnapshot = this.wal.captureSnapshot();
        for (SerializedRepositoryRecord serializedRepositoryRecord : captureSnapshot.getRecords().values()) {
            ContentClaim contentClaim = serializedRepositoryRecord.getContentClaim();
            if (contentClaim != null) {
                ResourceClaim resourceClaim = contentClaim.getResourceClaim();
                if (set.contains(resourceClaim)) {
                    ((Set) hashMap.computeIfAbsent(resourceClaim, resourceClaim2 -> {
                        return new HashSet();
                    })).add(createResourceClaimReference(serializedRepositoryRecord));
                }
            }
        }
        for (String str : captureSnapshot.getSwapLocations()) {
            ResourceClaimReference createResourceClaimReference = createResourceClaimReference(str, flowFileSwapManager.getQueueIdentifier(str));
            try {
                for (ResourceClaim resourceClaim3 : flowFileSwapManager.getSwapSummary(str).getResourceClaims()) {
                    if (set.contains(resourceClaim3)) {
                        ((Set) hashMap.computeIfAbsent(resourceClaim3, resourceClaim4 -> {
                            return new HashSet();
                        })).add(createResourceClaimReference);
                    }
                }
            } catch (Exception e) {
                logger.warn("Failed to read swap file " + str + " when attempting to find resource claim references", e);
            }
        }
        return hashMap;
    }

    private ResourceClaimReference createResourceClaimReference(final String str, final String str2) {
        return new ResourceClaimReference() { // from class: org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.1
            public String getQueueIdentifier() {
                return str2;
            }

            public boolean isSwappedOut() {
                return true;
            }

            public String getFlowFileUuid() {
                return null;
            }

            public String getSwapLocation() {
                return str;
            }

            public String toString() {
                return "Swap File[location=" + getSwapLocation() + ", queue=" + getQueueIdentifier() + "]";
            }

            public int hashCode() {
                return Objects.hash(str2, str);
            }

            public boolean equals(Object obj) {
                if (obj == null) {
                    return false;
                }
                if (obj == this) {
                    return true;
                }
                if (obj.getClass() != getClass()) {
                    return false;
                }
                ResourceClaimReference resourceClaimReference = (ResourceClaimReference) obj;
                return Objects.equals(str2, resourceClaimReference.getQueueIdentifier()) && Objects.equals(str, resourceClaimReference.getSwapLocation());
            }
        };
    }

    private ResourceClaimReference createResourceClaimReference(SerializedRepositoryRecord serializedRepositoryRecord) {
        final String queueIdentifier = serializedRepositoryRecord.getQueueIdentifier();
        final String attribute = serializedRepositoryRecord.getFlowFileRecord().getAttribute(CoreAttributes.UUID.key());
        return new ResourceClaimReference() { // from class: org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.2
            public String getQueueIdentifier() {
                return queueIdentifier;
            }

            public boolean isSwappedOut() {
                return false;
            }

            public String getFlowFileUuid() {
                return attribute;
            }

            public String getSwapLocation() {
                return null;
            }

            public String toString() {
                return "FlowFile[uuid=" + getFlowFileUuid() + ", queue=" + getQueueIdentifier() + "]";
            }

            public int hashCode() {
                return Objects.hash(queueIdentifier, attribute);
            }

            public boolean equals(Object obj) {
                if (obj == null) {
                    return false;
                }
                if (obj == this) {
                    return true;
                }
                if (obj.getClass() != getClass()) {
                    return false;
                }
                ResourceClaimReference resourceClaimReference = (ResourceClaimReference) obj;
                return Objects.equals(queueIdentifier, resourceClaimReference.getQueueIdentifier()) && Objects.equals(attribute, resourceClaimReference.getFlowFileUuid());
            }
        };
    }

    public long getStorageCapacity() throws IOException {
        long j = 0;
        Iterator<File> it = this.flowFileRepositoryPaths.iterator();
        while (it.hasNext()) {
            j += Files.getFileStore(it.next().toPath()).getTotalSpace();
        }
        return j;
    }

    public long getUsableStorageSpace() throws IOException {
        long j = 0;
        Iterator<File> it = this.flowFileRepositoryPaths.iterator();
        while (it.hasNext()) {
            j += Files.getFileStore(it.next().toPath()).getUsableSpace();
        }
        return j;
    }

    public String getFileStoreName() {
        try {
            return Files.getFileStore(this.flowFileRepositoryPaths.iterator().next().toPath()).name();
        } catch (IOException e) {
            return null;
        }
    }

    public void updateRepository(Collection<RepositoryRecord> collection) throws IOException {
        updateRepository(collection, this.alwaysSync);
    }

    private void markDestructable(ResourceClaim resourceClaim) {
        if (resourceClaim == null) {
            return;
        }
        this.claimManager.markDestructable(resourceClaim);
    }

    private boolean isDestructable(ContentClaim contentClaim) {
        ResourceClaim resourceClaim;
        return (contentClaim == null || (resourceClaim = contentClaim.getResourceClaim()) == null || resourceClaim.isInUse()) ? false : true;
    }

    public boolean isValidSwapLocationSuffix(String str) {
        boolean contains;
        synchronized (this.swapLocationSuffixes) {
            contains = this.swapLocationSuffixes.contains(normalizeSwapLocation(str));
        }
        return contains;
    }

    private void updateRepository(Collection<RepositoryRecord> collection, boolean z) throws IOException {
        for (RepositoryRecord repositoryRecord : collection) {
            if (repositoryRecord.getType() != RepositoryRecordType.DELETE && repositoryRecord.getType() != RepositoryRecordType.CONTENTMISSING && repositoryRecord.getType() != RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS && repositoryRecord.getDestination() == null) {
                throw new IllegalArgumentException("Record " + repositoryRecord + " has no destination and Type is " + repositoryRecord.getType());
            }
        }
        List list = (List) ((Map) collection.stream().collect(Collectors.partitioningBy(repositoryRecord2 -> {
            return repositoryRecord2.getType() == RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS;
        }))).get(Boolean.FALSE);
        if (list == null) {
            list = Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(list.size());
        list.forEach(repositoryRecord3 -> {
            arrayList.add(new LiveSerializedRepositoryRecord(repositoryRecord3));
        });
        updateContentClaims(collection, this.wal.update(arrayList, z));
    }

    protected void updateContentClaims(Collection<RepositoryRecord> collection, int i) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (RepositoryRecord repositoryRecord : collection) {
            updateClaimCounts(repositoryRecord);
            if (repositoryRecord.getType() == RepositoryRecordType.DELETE) {
                if (repositoryRecord.getCurrentClaim() != null && isDestructable(repositoryRecord.getCurrentClaim())) {
                    hashSet.add(repositoryRecord.getCurrentClaim().getResourceClaim());
                }
                if (repositoryRecord.getOriginalClaim() != null && !repositoryRecord.getOriginalClaim().equals(repositoryRecord.getCurrentClaim()) && isDestructable(repositoryRecord.getOriginalClaim())) {
                    hashSet.add(repositoryRecord.getOriginalClaim().getResourceClaim());
                }
            } else if (repositoryRecord.getType() == RepositoryRecordType.UPDATE) {
                if (repositoryRecord.getOriginalClaim() != null && repositoryRecord.getCurrentClaim() != repositoryRecord.getOriginalClaim() && isDestructable(repositoryRecord.getOriginalClaim())) {
                    hashSet.add(repositoryRecord.getOriginalClaim().getResourceClaim());
                }
            } else if (repositoryRecord.getType() == RepositoryRecordType.SWAP_OUT) {
                String swapLocation = repositoryRecord.getSwapLocation();
                hashSet2.add(swapLocation);
                hashSet3.remove(swapLocation);
            } else if (repositoryRecord.getType() == RepositoryRecordType.SWAP_IN) {
                String swapLocation2 = repositoryRecord.getSwapLocation();
                hashSet3.add(swapLocation2);
                hashSet2.remove(swapLocation2);
            }
        }
        Iterator<RepositoryRecord> it = collection.iterator();
        while (it.hasNext()) {
            List<ContentClaim> transientClaims = it.next().getTransientClaims();
            if (transientClaims != null) {
                for (ContentClaim contentClaim : transientClaims) {
                    if (isDestructable(contentClaim)) {
                        hashSet.add(contentClaim.getResourceClaim());
                    }
                }
            }
        }
        if (!hashSet2.isEmpty() || !hashSet3.isEmpty()) {
            synchronized (this.swapLocationSuffixes) {
                hashSet3.forEach(str -> {
                    this.swapLocationSuffixes.remove(normalizeSwapLocation(str));
                });
                hashSet2.forEach(str2 -> {
                    this.swapLocationSuffixes.add(normalizeSwapLocation(str2));
                });
            }
        }
        if (hashSet.isEmpty()) {
            return;
        }
        Integer valueOf = Integer.valueOf(i);
        BlockingQueue<ResourceClaim> blockingQueue = this.claimsAwaitingDestruction.get(valueOf);
        if (blockingQueue == null) {
            blockingQueue = new LinkedBlockingQueue();
            BlockingQueue<ResourceClaim> putIfAbsent = this.claimsAwaitingDestruction.putIfAbsent(valueOf, blockingQueue);
            if (putIfAbsent != null) {
                blockingQueue = putIfAbsent;
            }
        }
        blockingQueue.addAll(hashSet);
    }

    private void updateClaimCounts(RepositoryRecord repositoryRecord) {
        ContentClaim currentClaim = repositoryRecord.getCurrentClaim();
        ContentClaim originalClaim = repositoryRecord.getOriginalClaim();
        if (repositoryRecord.getType() == RepositoryRecordType.DELETE || repositoryRecord.getType() == RepositoryRecordType.CONTENTMISSING) {
            decrementClaimCount(currentClaim);
        }
        if (repositoryRecord.isContentModified()) {
            decrementClaimCount(originalClaim);
        }
    }

    private void decrementClaimCount(ContentClaim contentClaim) {
        if (contentClaim == null) {
            return;
        }
        this.claimManager.decrementClaimantCount(contentClaim.getResourceClaim());
    }

    protected static String normalizeSwapLocation(String str) {
        if (str == null) {
            return null;
        }
        String replace = str.replace("\\", "/");
        return StringUtils.substringBefore(getLocationSuffix((!replace.endsWith("/") || replace.length() <= 1) ? replace : replace.substring(0, replace.length() - 1)), ".");
    }

    private static String getLocationSuffix(String str) {
        int lastIndexOf = str.lastIndexOf("/");
        return (lastIndexOf < 0 || lastIndexOf >= str.length() - 1) ? str : str.substring(lastIndexOf + 1);
    }

    public void onSync(int i) {
        BlockingQueue blockingQueue = this.claimsAwaitingDestruction.get(Integer.valueOf(i));
        if (blockingQueue == null) {
            return;
        }
        HashSet hashSet = new HashSet();
        blockingQueue.drainTo(hashSet);
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            markDestructable((ResourceClaim) it.next());
        }
    }

    public void onGlobalSync() {
        for (BlockingQueue blockingQueue : this.claimsAwaitingDestruction.values()) {
            HashSet hashSet = new HashSet();
            blockingQueue.drainTo(hashSet);
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                markDestructable((ResourceClaim) it.next());
            }
        }
    }

    public void swapFlowFilesOut(List<FlowFileRecord> list, FlowFileQueue flowFileQueue, String str) throws IOException {
        ArrayList arrayList = new ArrayList();
        if (list == null || list.isEmpty()) {
            return;
        }
        Iterator<FlowFileRecord> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(new StandardRepositoryRecord(flowFileQueue, it.next(), str));
        }
        ArrayList arrayList2 = new ArrayList(arrayList.size());
        arrayList.forEach(repositoryRecord -> {
            arrayList2.add(new LiveSerializedRepositoryRecord(repositoryRecord));
        });
        this.wal.update(arrayList2, true);
        synchronized (this.swapLocationSuffixes) {
            this.swapLocationSuffixes.add(normalizeSwapLocation(str));
        }
        logger.info("Successfully swapped out {} FlowFiles from {} to Swap File {}", new Object[]{Integer.valueOf(list.size()), flowFileQueue, str});
    }

    public void swapFlowFilesIn(String str, List<FlowFileRecord> list, FlowFileQueue flowFileQueue) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<FlowFileRecord> it = list.iterator();
        while (it.hasNext()) {
            StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue, it.next());
            standardRepositoryRecord.setSwapLocation(str);
            standardRepositoryRecord.setDestination(flowFileQueue);
            arrayList.add(standardRepositoryRecord);
        }
        updateRepository(arrayList, true);
        synchronized (this.swapLocationSuffixes) {
            this.swapLocationSuffixes.remove(normalizeSwapLocation(str));
        }
        logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{Integer.valueOf(list.size()), flowFileQueue});
    }

    void deleteRecursively(File file) {
        File[] listFiles = file.listFiles();
        if (listFiles != null) {
            for (File file2 : listFiles) {
                if (!file2.delete()) {
                    logger.warn("Failed to delete old file {}; this file should be cleaned up manually", file2);
                }
            }
        }
        if (file.delete()) {
            return;
        }
        logger.warn("Failed to delete old directory {}; this directory should be cleaned up manually", file);
    }

    private Optional<Collection<SerializedRepositoryRecord>> migrateFromSequentialAccessLog(WriteAheadRepository<SerializedRepositoryRecord> writeAheadRepository) throws IOException {
        File file = new File(this.nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX));
        if (!file.exists()) {
            return Optional.empty();
        }
        SequentialAccessWriteAheadLog sequentialAccessWriteAheadLog = new SequentialAccessWriteAheadLog(file, this.serdeFactory, this);
        logger.info("Encountered FlowFile Repository that was written using the Sequential Access Write Ahead Log. Will recover from this version.");
        try {
            Collection recoverRecords = sequentialAccessWriteAheadLog.recoverRecords();
            sequentialAccessWriteAheadLog.shutdown();
            writeAheadRepository.update(recoverRecords, true);
            logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new Write-Ahead Log. Will not delete old files.");
            deleteRecursively(new File(file, "journals"));
            File file2 = new File(file, "checkpoint");
            if (!file2.delete() && file2.exists()) {
                logger.warn("Failed to delete old file {}; this file should be cleaned up manually", file2);
            }
            File file3 = new File(file, "checkpoint.partial");
            if (!file3.delete() && file3.exists()) {
                logger.warn("Failed to delete old file {}; this file should be cleaned up manually", file3);
            }
            return Optional.of(recoverRecords);
        } catch (Throwable th) {
            sequentialAccessWriteAheadLog.shutdown();
            throw th;
        }
    }

    private Optional<Collection<SerializedRepositoryRecord>> migrateFromMinimalLockingLog(WriteAheadRepository<SerializedRepositoryRecord> writeAheadRepository) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<File> it = this.recoveryFiles.iterator();
        while (it.hasNext()) {
            for (File file : it.next().listFiles(file2 -> {
                return file2.getName().startsWith("partition-");
            })) {
                arrayList.add(file);
            }
        }
        if (arrayList == null || arrayList.isEmpty()) {
            return Optional.empty();
        }
        logger.info("Encountered FlowFile Repository that was written using the 'Minimal Locking Write-Ahead Log'. Will recover from this version and re-write the repository using the new version of the Write-Ahead Log.");
        MinimalLockingWriteAheadLog minimalLockingWriteAheadLog = new MinimalLockingWriteAheadLog((SortedSet) this.recoveryFiles.stream().map((v0) -> {
            return v0.toPath();
        }).collect(Collectors.toCollection(TreeSet::new)), arrayList.size(), this.serdeFactory, (SyncListener) null);
        try {
            Collection recoverRecords = minimalLockingWriteAheadLog.recoverRecords();
            minimalLockingWriteAheadLog.shutdown();
            writeAheadRepository.update(recoverRecords, true);
            logger.info("Successfully recovered files from existing Write-Ahead Log and transitioned to new implementation. Will now delete old files.");
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                deleteRecursively((File) it2.next());
            }
            for (File file3 : this.recoveryFiles) {
                File file4 = new File(file3, "snapshot");
                if (!file4.delete() && file4.exists()) {
                    logger.warn("Failed to delete old file {}; this file should be cleaned up manually", file4);
                }
                File file5 = new File(file3, "snapshot.partial");
                if (!file5.delete() && file5.exists()) {
                    logger.warn("Failed to delete old file {}; this file should be cleaned up manually", file5);
                }
            }
            return Optional.of(recoverRecords);
        } catch (Throwable th) {
            minimalLockingWriteAheadLog.shutdown();
            throw th;
        }
    }

    public Set<String> findQueuesWithFlowFiles(FlowFileSwapManager flowFileSwapManager) throws IOException {
        if (this.recoveredRecords == null) {
            this.recoveredRecords = this.wal.recoverRecords();
        }
        HashSet hashSet = new HashSet();
        for (SerializedRepositoryRecord serializedRepositoryRecord : this.recoveredRecords) {
            RepositoryRecordType type = serializedRepositoryRecord.getType();
            if (type == RepositoryRecordType.CREATE || type == RepositoryRecordType.UPDATE) {
                String queueIdentifier = serializedRepositoryRecord.getQueueIdentifier();
                if (queueIdentifier != null) {
                    hashSet.add(queueIdentifier);
                }
            }
        }
        Iterator it = this.wal.getRecoveredSwapLocations().iterator();
        while (it.hasNext()) {
            hashSet.add(flowFileSwapManager.getQueueIdentifier((String) it.next()));
        }
        return hashSet;
    }

    public long loadFlowFiles(QueueProvider queueProvider) throws IOException {
        Collection<SerializedRepositoryRecord> recoverRecords = this.recoveredRecords == null ? this.wal.recoverRecords() : this.recoveredRecords;
        Set recoveredSwapLocations = this.wal.getRecoveredSwapLocations();
        synchronized (this.swapLocationSuffixes) {
            recoveredSwapLocations.forEach(str -> {
                this.swapLocationSuffixes.add(normalizeSwapLocation(str));
            });
            logger.debug("Recovered {} Swap Files: {}", Integer.valueOf(this.swapLocationSuffixes.size()), this.swapLocationSuffixes);
        }
        if (recoverRecords == null || recoverRecords.isEmpty()) {
            recoverRecords = isSequentialAccessWAL(this.walImplementation) ? migrateFromMinimalLockingLog(this.wal).orElse(new ArrayList()) : migrateFromSequentialAccessLog(this.wal).orElse(new ArrayList());
        }
        this.fieldCache.clear();
        HashMap hashMap = new HashMap();
        for (FlowFileQueue flowFileQueue : queueProvider.getAllQueues()) {
            hashMap.put(flowFileQueue.getIdentifier(), flowFileQueue);
        }
        ArrayList arrayList = new ArrayList();
        int i = 0;
        long j = 0;
        for (SerializedRepositoryRecord serializedRepositoryRecord : recoverRecords) {
            long longValue = this.serdeFactory.getRecordIdentifier(serializedRepositoryRecord).longValue();
            if (longValue > j) {
                j = longValue;
            }
            String queueIdentifier = serializedRepositoryRecord.getQueueIdentifier();
            if (queueIdentifier == null) {
                i++;
                logger.warn("Encountered Repository Record (id={}) with no Queue Identifier. Dropping this FlowFile", Long.valueOf(longValue));
                arrayList.add(new ReconstitutedSerializedRepositoryRecord.Builder().flowFileRecord(serializedRepositoryRecord.getFlowFileRecord()).swapLocation(serializedRepositoryRecord.getSwapLocation()).type(RepositoryRecordType.DELETE).build());
            } else {
                ContentClaim contentClaim = serializedRepositoryRecord.getContentClaim();
                FlowFileQueue flowFileQueue2 = (FlowFileQueue) hashMap.get(queueIdentifier);
                if (flowFileQueue2 == null) {
                    i++;
                    if (!isRetainOrphanedFlowFiles()) {
                        arrayList.add(new ReconstitutedSerializedRepositoryRecord.Builder().flowFileRecord(serializedRepositoryRecord.getFlowFileRecord()).swapLocation(serializedRepositoryRecord.getSwapLocation()).type(RepositoryRecordType.DELETE).build());
                        logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will be dropped.", Long.valueOf(longValue), queueIdentifier);
                    } else if (contentClaim == null) {
                        logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will not be restored to any FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in case the flow containing this queue is later restored.", Long.valueOf(longValue), queueIdentifier);
                    } else {
                        this.claimManager.incrementClaimantCount(contentClaim.getResourceClaim());
                        this.orphanedResourceClaims.add(contentClaim.getResourceClaim());
                        logger.warn("Encountered Repository Record (id={}) with Queue identifier {} but no Queue exists with that ID. This FlowFile will not be restored to any FlowFile Queue in the flow. However, it will remain in the FlowFile Repository in case the flow containing this queue is later restored. This may result in the following Content Claim not being cleaned up by the Content Repository: {}", new Object[]{Long.valueOf(longValue), queueIdentifier, contentClaim});
                    }
                } else {
                    if (contentClaim != null) {
                        this.claimManager.incrementClaimantCount(contentClaim.getResourceClaim());
                    }
                    flowFileQueue2.put(serializedRepositoryRecord.getFlowFileRecord());
                }
            }
        }
        this.recoveredRecords = null;
        this.flowFileSequenceGenerator.set(j + 1);
        logger.info("Successfully restored {} FlowFiles and {} Swap Files", Integer.valueOf(recoverRecords.size() - i), Integer.valueOf(recoveredSwapLocations.size()));
        if (i > 0) {
            logger.warn("On recovery, found {} FlowFiles whose queues no longer exists.", Integer.valueOf(i));
        }
        if (arrayList.isEmpty()) {
            logger.debug("No Drop Records to update Repository with");
        } else {
            long nanoTime = System.nanoTime();
            this.wal.update(arrayList, true);
            logger.info("Successfully updated FlowFile Repository with {} Drop Records due to missing queues in {} milliseconds", Integer.valueOf(arrayList.size()), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
        }
        this.checkpointFuture = this.checkpointExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.nifi.controller.repository.WriteAheadFlowFileRepository.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    WriteAheadFlowFileRepository.logger.info("Initiating checkpoint of FlowFile Repository");
                    long nanoTime2 = System.nanoTime();
                    WriteAheadFlowFileRepository.logger.info("Successfully checkpointed FlowFile Repository with {} records in {} milliseconds", Integer.valueOf(WriteAheadFlowFileRepository.this.checkpoint()), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime2, TimeUnit.NANOSECONDS)));
                } catch (Throwable th) {
                    WriteAheadFlowFileRepository.logger.error("Unable to checkpoint FlowFile Repository due to " + th.toString(), th);
                }
            }
        }, this.checkpointDelayMillis, this.checkpointDelayMillis, TimeUnit.MILLISECONDS);
        return j;
    }

    private boolean isRetainOrphanedFlowFiles() {
        return this.retainOrphanedFlowFiles;
    }

    public Set<ResourceClaim> findOrphanedResourceClaims() {
        return Collections.unmodifiableSet(this.orphanedResourceClaims);
    }

    public void updateMaxFlowFileIdentifier(long j) {
        long j2;
        do {
            j2 = this.flowFileSequenceGenerator.get();
            if (j2 >= j) {
                return;
            }
        } while (!this.flowFileSequenceGenerator.compareAndSet(j2, j));
    }

    public long getNextFlowFileSequence() {
        return this.flowFileSequenceGenerator.getAndIncrement();
    }

    public long getMaxFlowFileIdentifier() throws IOException {
        return this.flowFileSequenceGenerator.get() - 1;
    }

    public int checkpoint() throws IOException {
        return this.wal.checkpoint();
    }
}
