/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.provenance.index.lucene;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.AsyncLineageSubmission;
import org.apache.nifi.provenance.AsyncQuerySubmission;
import org.apache.nifi.provenance.ProgressiveResult;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.StandardLineageResult;
import org.apache.nifi.provenance.StandardQueryResult;
import org.apache.nifi.provenance.authorization.EventAuthorizer;
import org.apache.nifi.provenance.authorization.EventTransformer;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.provenance.index.lucene.CachedQuery;
import org.apache.nifi.provenance.index.lucene.CommitPreference;
import org.apache.nifi.provenance.index.lucene.ConvertEventToLuceneDocument;
import org.apache.nifi.provenance.index.lucene.EventIndexTask;
import org.apache.nifi.provenance.index.lucene.IndexDirectoryManager;
import org.apache.nifi.provenance.index.lucene.IndexableDocument;
import org.apache.nifi.provenance.index.lucene.LatestEventsPerProcessorQuery;
import org.apache.nifi.provenance.index.lucene.LatestEventsQuery;
import org.apache.nifi.provenance.index.lucene.LuceneCacheWarmer;
import org.apache.nifi.provenance.index.lucene.MigrateDefunctIndex;
import org.apache.nifi.provenance.index.lucene.QueryTask;
import org.apache.nifi.provenance.index.lucene.StoredDocument;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.lineage.LineageComputationType;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.provenance.store.EventStore;
import org.apache.nifi.provenance.util.DirectoryUtils;
import org.apache.nifi.provenance.util.NamedThreadFactory;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.util.timebuffer.EntityAccess;
import org.apache.nifi.util.timebuffer.LongEntityAccess;
import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LuceneEventIndex
implements EventIndex {
    private static final Logger logger = LoggerFactory.getLogger(LuceneEventIndex.class);
    private static final String EVENT_CATEGORY = "Provenance Repository";
    public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
    public static final int MAX_DELETE_INDEX_WAIT_SECONDS = 30;
    public static final int MAX_LINEAGE_NODES = 1000;
    public static final int MAX_INDEX_THREADS = 100;
    public static final int MAX_LINEAGE_UUIDS = 100;
    private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<String, AsyncQuerySubmission>();
    private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<String, AsyncLineageSubmission>();
    private final BlockingQueue<StoredDocument> documentQueue = new LinkedBlockingQueue<StoredDocument>(1000);
    private final List<EventIndexTask> indexTasks = Collections.synchronizedList(new ArrayList());
    private final ExecutorService queryExecutor;
    private final ExecutorService indexExecutor;
    private final RepositoryConfiguration config;
    private final IndexManager indexManager;
    private final ConvertEventToLuceneDocument eventConverter;
    private final IndexDirectoryManager directoryManager;
    private volatile boolean closed = false;
    private final TimedBuffer<TimestampedLong> queuePauseNanos = new TimedBuffer(TimeUnit.SECONDS, 300, (EntityAccess)new LongEntityAccess());
    private final TimedBuffer<TimestampedLong> eventsIndexed = new TimedBuffer(TimeUnit.SECONDS, 300, (EntityAccess)new LongEntityAccess());
    private final AtomicLong eventCount = new AtomicLong(0L);
    private final EventReporter eventReporter;
    private final List<CachedQuery> cachedQueries = new ArrayList<CachedQuery>();
    private LatestEventsPerProcessorQuery latestEventsPerProcessorQuery;
    private ScheduledExecutorService maintenanceExecutor;
    private ScheduledExecutorService cacheWarmerExecutor;
    private EventStore eventStore;
    private volatile boolean newestIndexDefunct = false;

    public LuceneEventIndex(RepositoryConfiguration config, IndexManager indexManager, EventReporter eventReporter) {
        this(config, indexManager, 1000000, eventReporter);
    }

    public LuceneEventIndex(RepositoryConfiguration config, IndexManager indexManager, int maxEventsPerCommit, EventReporter eventReporter) {
        int numIndexThreads;
        this.eventReporter = eventReporter;
        this.queryExecutor = Executors.newFixedThreadPool(config.getQueryThreadPoolSize(), new NamedThreadFactory("Provenance Query"));
        this.indexExecutor = Executors.newFixedThreadPool(config.getIndexThreadPoolSize(), new NamedThreadFactory("Index Provenance Events"));
        this.cacheWarmerExecutor = Executors.newScheduledThreadPool(config.getStorageDirectories().size(), new NamedThreadFactory("Warm Lucene Index", true));
        this.directoryManager = new IndexDirectoryManager(config);
        int configuredIndexPoolSize = config.getIndexThreadPoolSize();
        if (configuredIndexPoolSize > 100) {
            logger.warn("The Provenance Repository is configured to perform indexing of events using {} threads. This number exceeds the maximum allowable number of threads, which is {}. Will proceed using {} threads. This value is limited because the performance of indexing will decrease and startup times will increase when setting this value too high.", new Object[]{configuredIndexPoolSize, 100, 100});
            numIndexThreads = 100;
        } else {
            numIndexThreads = configuredIndexPoolSize;
        }
        for (int i = 0; i < numIndexThreads; ++i) {
            EventIndexTask task = new EventIndexTask(this.documentQueue, indexManager, this.directoryManager, maxEventsPerCommit, eventReporter);
            this.indexTasks.add(task);
            this.indexExecutor.submit(task);
        }
        this.config = config;
        this.indexManager = indexManager;
        this.eventConverter = new ConvertEventToLuceneDocument(config.getSearchableFields(), config.getSearchableAttributes());
    }

    @Override
    public void initialize(EventStore eventStore) {
        this.eventStore = eventStore;
        this.directoryManager.initialize();
        this.maintenanceExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Provenance Repository Maintenance"));
        this.maintenanceExecutor.scheduleWithFixedDelay(this::performMaintenance, 1L, 1L, TimeUnit.MINUTES);
        this.maintenanceExecutor.scheduleWithFixedDelay(this::purgeObsoleteQueries, 30L, 30L, TimeUnit.SECONDS);
        this.cachedQueries.add(new LatestEventsQuery());
        this.latestEventsPerProcessorQuery = new LatestEventsPerProcessorQuery();
        this.cachedQueries.add(this.latestEventsPerProcessorQuery);
        this.triggerReindexOfDefunctIndices();
        this.triggerCacheWarming();
    }

    private void triggerReindexOfDefunctIndices() {
        File newestIndexDirectory;
        ScheduledExecutorService rebuildIndexExecutor = Executors.newScheduledThreadPool(2, new NamedThreadFactory("Rebuild Defunct Provenance Indices", true));
        List<File> allIndexDirectories = this.directoryManager.getAllIndexDirectories(true, true);
        allIndexDirectories.sort(DirectoryUtils.OLDEST_INDEX_FIRST);
        List<File> defunctIndices = this.detectDefunctIndices(allIndexDirectories);
        AtomicInteger rebuildCount = new AtomicInteger(0);
        int totalCount = defunctIndices.size();
        for (File defunctIndex : defunctIndices) {
            try {
                if (this.isLucene4IndexPresent(defunctIndex)) {
                    logger.info("Encountered Lucene 8 index {} and also the corresponding Lucene 4 index; will only trigger rebuilding of one directory.", (Object)defunctIndex);
                    rebuildCount.incrementAndGet();
                    continue;
                }
                logger.info("Determined that Lucene Index Directory {} is defunct. Will destroy and rebuild index", (Object)defunctIndex);
                Tuple<Long, Long> timeRange = LuceneEventIndex.getTimeRange(defunctIndex, allIndexDirectories);
                rebuildIndexExecutor.submit(new MigrateDefunctIndex(defunctIndex, this.indexManager, this.directoryManager, (Long)timeRange.getKey(), (Long)timeRange.getValue(), this.eventStore, this.eventReporter, this.eventConverter, rebuildCount, totalCount));
            }
            catch (Exception e) {
                logger.error("Detected defunct index {} but failed to rebuild index", (Object)defunctIndex, (Object)e);
            }
        }
        rebuildIndexExecutor.shutdown();
        if (!allIndexDirectories.isEmpty() && defunctIndices.contains(newestIndexDirectory = allIndexDirectories.get(allIndexDirectories.size() - 1))) {
            this.newestIndexDefunct = true;
        }
    }

    private boolean isLucene4IndexPresent(File indexDirectory) {
        String indexName = indexDirectory.getName();
        if (indexName.contains("lucene-8-")) {
            int prefixEnd = indexName.indexOf("index-");
            String oldIndexName = indexName.substring(prefixEnd);
            File oldIndexFile = new File(indexDirectory.getParentFile(), oldIndexName);
            boolean oldIndexExists = oldIndexFile.exists();
            if (oldIndexExists) {
                return true;
            }
        }
        return false;
    }

    private void triggerCacheWarming() {
        Optional<Integer> warmCacheMinutesOption = this.config.getWarmCacheFrequencyMinutes();
        if (warmCacheMinutesOption.isPresent() && warmCacheMinutesOption.get() > 0) {
            for (File storageDir : this.config.getStorageDirectories().values()) {
                int minutes = warmCacheMinutesOption.get();
                this.cacheWarmerExecutor.scheduleWithFixedDelay(new LuceneCacheWarmer(storageDir, this.indexManager), 1L, minutes, TimeUnit.MINUTES);
            }
        }
    }

    protected static Tuple<Long, Long> getTimeRange(File indexDirectory, List<File> sortedIndexDirectories) {
        long startTimestamp = DirectoryUtils.getIndexTimestamp(indexDirectory);
        if (sortedIndexDirectories.isEmpty()) {
            return new Tuple((Object)startTimestamp, (Object)System.currentTimeMillis());
        }
        int index = sortedIndexDirectories.indexOf(indexDirectory);
        if (index < 0) {
            long firstIndexTimestamp = DirectoryUtils.getIndexTimestamp(sortedIndexDirectories.get(0));
            if (startTimestamp < firstIndexTimestamp) {
                return new Tuple((Object)startTimestamp, (Object)firstIndexTimestamp);
            }
            return new Tuple((Object)startTimestamp, (Object)System.currentTimeMillis());
        }
        if (index + 1 > sortedIndexDirectories.size() - 1) {
            return new Tuple((Object)startTimestamp, (Object)System.currentTimeMillis());
        }
        File upperBoundIndexDir = sortedIndexDirectories.get(index + 1);
        long endTimestamp = DirectoryUtils.getIndexTimestamp(upperBoundIndexDir);
        return new Tuple((Object)startTimestamp, (Object)endTimestamp);
    }

    private List<File> detectDefunctIndices(Collection<File> indexDirectories) {
        ArrayList<File> defunct = new ArrayList<File>();
        for (File indexDir : indexDirectories) {
            if (!this.isIndexDefunct(indexDir)) continue;
            defunct.add(indexDir);
        }
        return defunct;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isIndexDefunct(File indexDir) {
        EventIndexSearcher indexSearcher = null;
        try {
            indexSearcher = this.indexManager.borrowIndexSearcher(indexDir);
        }
        catch (IOException ioe) {
            logger.warn("Lucene Index {} could not be opened. Assuming that index is defunct and will re-index events belonging to this index.", (Object)indexDir);
            boolean bl = true;
            return bl;
        }
        finally {
            if (indexSearcher != null) {
                this.indexManager.returnIndexSearcher(indexSearcher);
            }
        }
        return false;
    }

    @Override
    public long getMinimumEventIdToReindex(String partitionName) {
        return Math.max(0L, this.getMaxEventId(partitionName) - 10000L);
    }

    protected IndexDirectoryManager getDirectoryManager() {
        return this.directoryManager;
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
        this.queryExecutor.shutdownNow();
        this.indexExecutor.shutdown();
        this.cacheWarmerExecutor.shutdown();
        if (this.maintenanceExecutor != null) {
            this.maintenanceExecutor.shutdown();
        }
        ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
        for (EventIndexTask eventIndexTask : this.indexTasks) {
            futures.add(eventIndexTask.shutdown());
        }
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (Exception e) {
                logger.error("Failed to shutdown Index Task", (Throwable)e);
            }
        }
        this.indexManager.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    long getMaxEventId(String partitionName) {
        List<File> allDirectories = this.getDirectoryManager().getDirectories((Long)0L, (Long)Long.MAX_VALUE, partitionName);
        if (allDirectories.isEmpty()) {
            return -1L;
        }
        allDirectories.sort(DirectoryUtils.NEWEST_INDEX_FIRST);
        for (File directory : allDirectories) {
            EventIndexSearcher searcher;
            try {
                searcher = this.indexManager.borrowIndexSearcher(directory);
            }
            catch (IOException ioe) {
                logger.warn("Unable to read from Index Directory {}. Will assume that the index is incomplete and not consider this index when determining max event ID", (Object)directory);
                continue;
            }
            try {
                IndexReader reader = searcher.getIndexSearcher().getIndexReader();
                int maxDocId = reader.maxDoc() - 1;
                Document document = reader.document(maxDocId);
                long eventId = document.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
                logger.info("Determined that Max Event ID indexed for Partition {} is approximately {} based on index {}", new Object[]{partitionName, eventId, directory});
                long l = eventId;
                return l;
            }
            catch (IOException ioe) {
                logger.warn("Unable to search Index Directory {}. Will assume that the index is incomplete and not consider this index when determining max event ID", (Object)directory, (Object)ioe);
            }
            finally {
                this.indexManager.returnIndexSearcher(searcher);
            }
        }
        return -1L;
    }

    @Override
    public boolean isReindexNecessary() {
        logger.info("Will avoid re-indexing Provenance Events because the newest index is defunct, so it will be re-indexed in the background");
        return !this.newestIndexDefunct;
    }

    @Override
    public void reindexEvents(Map<ProvenanceEventRecord, StorageSummary> events) {
        if (this.newestIndexDefunct) {
            logger.info("Will avoid re-indexing {} events because the newest index is defunct, so it will be re-indexed in the background", (Object)events.size());
            return;
        }
        EventIndexTask indexTask = new EventIndexTask(this.documentQueue, this.indexManager, this.directoryManager, 1000000, this.eventReporter);
        File lastIndexDir = null;
        long lastEventTime = -2L;
        ArrayList<IndexableDocument> indexableDocs = new ArrayList<IndexableDocument>(events.size());
        for (Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
            File indexDir;
            ProvenanceEventRecord event = entry.getKey();
            StorageSummary summary = entry.getValue();
            for (CachedQuery cachedQuery : this.cachedQueries) {
                cachedQuery.update(event, summary);
            }
            Document document = this.eventConverter.convert(event, summary);
            if (document == null) {
                logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", (Object)event.getEventId());
                continue;
            }
            if (event.getEventTime() == lastEventTime) {
                indexDir = lastIndexDir;
            } else {
                List<File> files = this.getDirectoryManager().getDirectories((Long)event.getEventTime(), null, false);
                if (files.isEmpty()) {
                    String partitionName = summary.getPartitionName().get();
                    indexDir = this.getDirectoryManager().getWritableIndexingDirectory(event.getEventTime(), partitionName);
                } else {
                    indexDir = files.get(0);
                }
                lastIndexDir = indexDir;
            }
            IndexableDocument doc = new IndexableDocument(document, summary, indexDir);
            indexableDocs.add(doc);
        }
        try {
            indexTask.reIndex(indexableDocs, CommitPreference.PREVENT_COMMIT);
        }
        catch (IOException ioe) {
            logger.error("Failed to reindex some Provenance Events", (Throwable)ioe);
            this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to re-index some Provenance Events. Some Provenance Events may not be available for querying. See logs for more information.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitChanges(String partitionName) throws IOException {
        Optional<File> indexDir = this.directoryManager.getActiveIndexDirectory(partitionName);
        if (indexDir.isPresent()) {
            EventIndexWriter eventIndexWriter = this.indexManager.borrowIndexWriter(indexDir.get());
            try {
                eventIndexWriter.commit();
            }
            finally {
                this.indexManager.returnIndexWriter(eventIndexWriter, false, false);
            }
        }
    }

    protected void addEvent(ProvenanceEventRecord event, StorageSummary location) {
        for (CachedQuery cachedQuery : this.cachedQueries) {
            cachedQuery.update(event, location);
        }
        Document document = this.eventConverter.convert(event, location);
        if (document == null) {
            logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", (Object)event.getEventId());
        } else {
            StoredDocument doc = new StoredDocument(document, location);
            boolean added = false;
            while (!added && !this.closed) {
                long totalEventCount;
                added = this.documentQueue.offer(doc);
                if (!added) {
                    long start = System.nanoTime();
                    try {
                        added = this.documentQueue.offer(doc, 1L, TimeUnit.SECONDS);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        logger.warn("Interrupted while attempting to enqueue Provenance Event for indexing; this event will not be indexed");
                        return;
                    }
                    long nanos = System.nanoTime() - start;
                    this.queuePauseNanos.add((Object)new TimestampedLong(Long.valueOf(nanos)));
                }
                if (!added || (totalEventCount = this.eventCount.incrementAndGet()) % 1000000L != 0L || !logger.isDebugEnabled()) continue;
                this.incrementAndReportStats();
            }
        }
    }

    private void incrementAndReportStats() {
        long fiveMinutesAgo = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L);
        TimestampedLong nanosLastFive = (TimestampedLong)this.queuePauseNanos.getAggregateValue(fiveMinutesAgo);
        if (nanosLastFive == null) {
            return;
        }
        TimestampedLong eventsLast5 = (TimestampedLong)this.eventsIndexed.getAggregateValue(fiveMinutesAgo);
        if (eventsLast5 == null) {
            return;
        }
        long numEventsLast5 = eventsLast5.getValue();
        long millis = TimeUnit.NANOSECONDS.toMillis(nanosLastFive.getValue());
        logger.debug("In the last 5 minutes, have spent {} CPU-millis waiting to enqueue events for indexing and have indexed {} events ({} since NiFi started)", new Object[]{millis, numEventsLast5, this.eventCount.get()});
    }

    @Override
    public void addEvents(Map<ProvenanceEventRecord, StorageSummary> events) {
        this.eventsIndexed.add((Object)new TimestampedLong(Long.valueOf(events.size())));
        for (Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
            this.addEvent(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public ComputeLineageSubmission submitLineageComputation(long eventId, NiFiUser user, EventAuthorizer eventAuthorizer) {
        Optional<ProvenanceEventRecord> eventOption;
        try {
            eventOption = this.eventStore.getEvent(eventId);
        }
        catch (Exception e) {
            logger.error("Failed to retrieve Provenance Event with ID " + eventId + " to calculate data lineage due to: " + e, (Throwable)e);
            AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, Long.valueOf(eventId), Collections.emptySet(), 1, user == null ? null : user.getIdentity());
            result.getResult().setError("Failed to retrieve Provenance Event with ID " + eventId + ". See logs for more information.");
            return result;
        }
        if (!eventOption.isPresent()) {
            AsyncLineageSubmission result = new AsyncLineageSubmission(LineageComputationType.FLOWFILE_LINEAGE, Long.valueOf(eventId), Collections.emptySet(), 1, user == null ? null : user.getIdentity());
            result.getResult().setError("Could not find Provenance Event with ID " + eventId);
            this.lineageSubmissionMap.put(result.getLineageIdentifier(), result);
            return result;
        }
        ProvenanceEventRecord event = eventOption.get();
        return this.submitLineageComputation(Collections.singleton(event.getFlowFileUuid()), user, eventAuthorizer, LineageComputationType.FLOWFILE_LINEAGE, eventId, event.getLineageStartDate(), Long.MAX_VALUE);
    }

    private ComputeLineageSubmission submitLineageComputation(Collection<String> flowFileUuids, NiFiUser user, EventAuthorizer eventAuthorizer, LineageComputationType computationType, Long eventId, long startTimestamp, long endTimestamp) {
        if (flowFileUuids.size() > 100) {
            throw new IllegalArgumentException(String.format("Cannot compute lineage for more than %s FlowFiles. This lineage contains %s.", 100, flowFileUuids.size()));
        }
        List<File> indexDirs = this.directoryManager.getDirectories(startTimestamp, endTimestamp);
        AsyncLineageSubmission submission = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, indexDirs.size(), user == null ? null : user.getIdentity());
        this.lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
        BooleanQuery lineageQuery = this.buildLineageQuery(flowFileUuids);
        List<File> indexDirectories = this.directoryManager.getDirectories(startTimestamp, endTimestamp);
        if (indexDirectories.isEmpty()) {
            submission.getResult().update(Collections.emptyList(), 0L);
        } else {
            indexDirectories.sort(DirectoryUtils.OLDEST_INDEX_FIRST);
            for (File indexDir : indexDirectories) {
                this.queryExecutor.submit(new QueryTask((Query)lineageQuery, (ProgressiveResult)submission.getResult(), 1000, this.indexManager, indexDir, this.eventStore, eventAuthorizer, EventTransformer.PLACEHOLDER_TRANSFORMER));
            }
        }
        try {
            submission.getResult().awaitCompletion(500L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return submission;
    }

    private BooleanQuery buildLineageQuery(Collection<String> flowFileUuids) {
        BooleanQuery lineageQuery;
        if (flowFileUuids == null || flowFileUuids.isEmpty()) {
            lineageQuery = null;
        } else {
            BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder();
            for (String flowFileUuid : flowFileUuids) {
                TermQuery termQuery = new TermQuery(new Term(SearchableFields.FlowFileUUID.getSearchableFieldName(), flowFileUuid));
                queryBuilder.add(new BooleanClause((Query)termQuery, BooleanClause.Occur.SHOULD));
            }
            lineageQuery = queryBuilder.build();
        }
        return lineageQuery;
    }

    @Override
    public QuerySubmission submitQuery(org.apache.nifi.provenance.search.Query query, EventAuthorizer authorizer, String userId) {
        this.validate(query);
        for (CachedQuery cachedQuery : this.cachedQueries) {
            Optional<List<Long>> eventIdListOption = cachedQuery.evaluate(query);
            if (!eventIdListOption.isPresent()) continue;
            AsyncQuerySubmission submission = new AsyncQuerySubmission(query, 1, userId);
            this.querySubmissionMap.put(query.getIdentifier(), submission);
            List<Long> eventIds = eventIdListOption.get();
            logger.debug("Cached Query {} produced {} Event IDs for {}: {}", new Object[]{cachedQuery, eventIds.size(), query, eventIds});
            this.queryExecutor.submit(() -> {
                try {
                    List<ProvenanceEventRecord> events = this.eventStore.getEvents(eventIds, authorizer, EventTransformer.EMPTY_TRANSFORMER);
                    logger.debug("Retrieved {} of {} Events from Event Store", (Object)events.size(), (Object)eventIds.size());
                    submission.getResult().update(events, (long)eventIds.size());
                }
                catch (Exception e) {
                    submission.getResult().setError("Failed to retrieve Provenance Events from store; see logs for more details");
                    logger.error("Failed to retrieve Provenance Events from store", (Throwable)e);
                }
            });
            try {
                submission.getResult().awaitCompletion(500L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return submission;
        }
        List<File> indexDirectories = this.directoryManager.getDirectories(query.getStartDate() == null ? null : Long.valueOf(query.getStartDate().getTime()), query.getEndDate() == null ? null : Long.valueOf(query.getEndDate().getTime()));
        AsyncQuerySubmission submission = new AsyncQuerySubmission(query, indexDirectories.size(), userId);
        this.querySubmissionMap.put(query.getIdentifier(), submission);
        Query luceneQuery = LuceneUtil.convertQuery(query);
        logger.debug("Submitting query {} with identifier {} against {} index directories: {}", new Object[]{luceneQuery, query.getIdentifier(), indexDirectories.size(), indexDirectories});
        if (indexDirectories.isEmpty()) {
            submission.getResult().update(Collections.emptyList(), 0L);
        } else {
            indexDirectories.sort(DirectoryUtils.NEWEST_INDEX_FIRST);
            for (File indexDir : indexDirectories) {
                this.queryExecutor.submit(new QueryTask(luceneQuery, (ProgressiveResult)submission.getResult(), query.getMaxResults(), this.indexManager, indexDir, this.eventStore, authorizer, EventTransformer.EMPTY_TRANSFORMER));
            }
        }
        try {
            submission.getResult().awaitCompletion(500L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return submission;
    }

    @Override
    public Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId) throws IOException {
        List<Long> eventIds = this.latestEventsPerProcessorQuery.getLatestEventIds(componentId);
        if (eventIds.isEmpty()) {
            logger.info("There are no recent Provenance Events cached for Component with ID {}", (Object)componentId);
            return Optional.empty();
        }
        Long latestEventId = eventIds.get(eventIds.size() - 1);
        Optional<ProvenanceEventRecord> latestEvent = this.eventStore.getEvent(latestEventId);
        if (latestEvent.isPresent()) {
            logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", (Object)latestEvent.get(), (Object)componentId);
        } else {
            logger.info("There are no recent Provenance Events cached for Component with ID {}", (Object)componentId);
        }
        return latestEvent;
    }

    @Override
    public ComputeLineageSubmission submitLineageComputation(String flowFileUuid, NiFiUser user, EventAuthorizer eventAuthorizer) {
        return this.submitLineageComputation(Collections.singleton(flowFileUuid), user, eventAuthorizer, LineageComputationType.FLOWFILE_LINEAGE, null, 0L, Long.MAX_VALUE);
    }

    @Override
    public ComputeLineageSubmission submitExpandChildren(long eventId, NiFiUser user, EventAuthorizer authorizer) {
        String userId = user == null ? null : user.getIdentity();
        try {
            Optional<ProvenanceEventRecord> eventOption = this.eventStore.getEvent(eventId);
            if (!eventOption.isPresent()) {
                AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, Long.valueOf(eventId), Collections.emptyList(), 1, userId);
                this.lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                submission.getResult().update(Collections.emptyList(), 0L);
                return submission;
            }
            ProvenanceEventRecord event = eventOption.get();
            switch (event.getEventType()) {
                case CLONE: 
                case FORK: 
                case JOIN: 
                case REPLAY: {
                    return this.submitLineageComputation(event.getChildUuids(), user, authorizer, LineageComputationType.EXPAND_CHILDREN, eventId, event.getEventTime(), Long.MAX_VALUE);
                }
            }
            AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, Long.valueOf(eventId), Collections.emptyList(), 1, userId);
            this.lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
            submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded");
            return submission;
        }
        catch (Exception e) {
            AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, Long.valueOf(eventId), Collections.emptyList(), 1, userId);
            this.lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
            submission.getResult().setError("Failed to expand children for lineage of event with ID " + eventId + " due to: " + e);
            return submission;
        }
    }

    @Override
    public ComputeLineageSubmission submitExpandParents(long eventId, NiFiUser user, EventAuthorizer authorizer) {
        String userId = user == null ? null : user.getIdentity();
        try {
            Optional<ProvenanceEventRecord> eventOption = this.eventStore.getEvent(eventId);
            if (!eventOption.isPresent()) {
                AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, Long.valueOf(eventId), Collections.emptyList(), 1, userId);
                this.lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
                submission.getResult().update(Collections.emptyList(), 0L);
                return submission;
            }
            ProvenanceEventRecord event = eventOption.get();
            switch (event.getEventType()) {
                case CLONE: 
                case FORK: 
                case JOIN: 
                case REPLAY: {
                    return this.submitLineageComputation(event.getParentUuids(), user, authorizer, LineageComputationType.EXPAND_PARENTS, eventId, event.getLineageStartDate(), event.getEventTime());
                }
            }
            AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, Long.valueOf(eventId), Collections.emptyList(), 1, userId);
            this.lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
            submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded");
            return submission;
        }
        catch (Exception e) {
            AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, Long.valueOf(eventId), Collections.emptyList(), 1, userId);
            this.lineageSubmissionMap.put(submission.getLineageIdentifier(), submission);
            submission.getResult().setError("Failed to expand parents for lineage of event with ID " + eventId + " due to: " + e);
            return submission;
        }
    }

    public AsyncLineageSubmission retrieveLineageSubmission(String lineageIdentifier, NiFiUser user) {
        AsyncLineageSubmission submission = (AsyncLineageSubmission)this.lineageSubmissionMap.get(lineageIdentifier);
        String userId = submission.getSubmitterIdentity();
        if (user == null && userId == null) {
            return submission;
        }
        if (user == null) {
            throw new AccessDeniedException("Cannot retrieve Provenance Lineage Submission because no user id was provided");
        }
        if (userId == null || userId.equals(user.getIdentity())) {
            return submission;
        }
        throw new AccessDeniedException("Cannot retrieve Provenance Lineage Submission because " + user.getIdentity() + " is not the user who submitted the request");
    }

    @Override
    public QuerySubmission retrieveQuerySubmission(String queryIdentifier, NiFiUser user) {
        QuerySubmission submission = (QuerySubmission)this.querySubmissionMap.get(queryIdentifier);
        String userId = submission.getSubmitterIdentity();
        if (user == null && userId == null) {
            return submission;
        }
        if (user == null) {
            throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because no user id was provided");
        }
        if (userId == null || userId.equals(user.getIdentity())) {
            return submission;
        }
        throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request");
    }

    @Override
    public long getSize() {
        long total = 0L;
        for (File file : this.directoryManager.getDirectories(null, null)) {
            total += DirectoryUtils.getSize(file);
        }
        return total;
    }

    private void validate(org.apache.nifi.provenance.search.Query query) {
        int numQueries = this.querySubmissionMap.size();
        if (numQueries > 10) {
            this.purgeObsoleteQueries();
            if (this.querySubmissionMap.size() > 10) {
                throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted due to poorly behaving clients not issuing DELETE requests. Please try again later.");
            }
        }
        if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
            throw new IllegalArgumentException("Query End Time cannot be before Query Start Time");
        }
    }

    void performMaintenance() {
        try {
            long earliestEventTime;
            List<ProvenanceEventRecord> firstEvents = this.eventStore.getEvents(0L, 1);
            if (firstEvents.isEmpty()) {
                earliestEventTime = System.currentTimeMillis();
                logger.debug("Found no events in the Provenance Repository. In order to perform maintenace of the indices, will assume that the first event time is now ({})", (Object)System.currentTimeMillis());
            } else {
                ProvenanceEventRecord firstEvent = firstEvents.get(0);
                earliestEventTime = firstEvent.getEventTime();
                logger.debug("First Event Time is {} ({}) with Event ID {}; will delete any Lucene Index that is older than this", new Object[]{earliestEventTime, new Date(earliestEventTime), firstEvent.getEventId()});
            }
            List<File> indicesBeforeEarliestEvent = this.directoryManager.getDirectoriesBefore(earliestEventTime);
            for (File index : indicesBeforeEarliestEvent) {
                logger.debug("Index directory {} is now expired. Attempting to remove index", (Object)index);
                this.tryDeleteIndex(index);
            }
        }
        catch (Exception e) {
            logger.error("Failed to perform background maintenance procedures", (Throwable)e);
            this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to perform maintenance of Provenance Repository. See logs for more information.");
        }
    }

    protected boolean tryDeleteIndex(File indexDirectory) {
        long startNanos = System.nanoTime();
        boolean removed = false;
        while (!removed && System.nanoTime() - startNanos < TimeUnit.SECONDS.toNanos(30L)) {
            removed = this.indexManager.removeIndex(indexDirectory);
            if (removed) continue;
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException ie) {
                logger.debug("Interrupted when trying to remove index {} from IndexManager; will not remove index", (Object)indexDirectory);
                Thread.currentThread().interrupt();
                return false;
            }
        }
        if (removed) {
            try {
                FileUtils.deleteFile((File)indexDirectory, (boolean)true);
                logger.debug("Successfully deleted directory {}", (Object)indexDirectory);
            }
            catch (IOException e) {
                logger.warn("The Lucene Index located at " + indexDirectory + " has expired and contains no Provenance Events that still exist in the respository. However, the directory could not be deleted.", (Throwable)e);
            }
            this.directoryManager.removeDirectory(indexDirectory);
            logger.info("Successfully removed expired Lucene Index {}", (Object)indexDirectory);
        } else {
            logger.warn("The Lucene Index located at {} has expired and contains no Provenance Events that still exist in the respository. However, the directory could not be deleted because it is still actively being used. Will continue to try to delete in a subsequent maintenance cycle.", (Object)indexDirectory);
        }
        return removed;
    }

    private void purgeObsoleteQueries() {
        try {
            Date now = new Date();
            Iterator queryIterator = this.querySubmissionMap.entrySet().iterator();
            while (queryIterator.hasNext()) {
                Map.Entry entry = queryIterator.next();
                StandardQueryResult result = ((AsyncQuerySubmission)entry.getValue()).getResult();
                if (!((AsyncQuerySubmission)entry.getValue()).isCanceled() && (!result.isFinished() || !result.getExpiration().before(now))) continue;
                queryIterator.remove();
            }
            Iterator lineageIterator = this.lineageSubmissionMap.entrySet().iterator();
            while (lineageIterator.hasNext()) {
                Map.Entry entry = lineageIterator.next();
                StandardLineageResult result = ((AsyncLineageSubmission)entry.getValue()).getResult();
                if (!((AsyncLineageSubmission)entry.getValue()).isCanceled() && (!result.isFinished() || !result.getExpiration().before(now))) continue;
                lineageIterator.remove();
            }
        }
        catch (Exception e) {
            logger.error("Failed to expire Provenance Query Results due to {}", (Object)e.toString());
            logger.error("", (Throwable)e);
        }
    }
}

