/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.provenance.index.lucene;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.provenance.index.lucene.CommitPreference;
import org.apache.nifi.provenance.index.lucene.IndexDirectoryManager;
import org.apache.nifi.provenance.index.lucene.IndexableDocument;
import org.apache.nifi.provenance.index.lucene.StoredDocument;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventIndexTask
implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(EventIndexTask.class);
    private static final String EVENT_CATEGORY = "Provenance Repository";
    public static final int MAX_DOCUMENTS_PER_THREAD = 100;
    public static final int DEFAULT_MAX_EVENTS_PER_COMMIT = 1000000;
    private final BlockingQueue<StoredDocument> documentQueue;
    private final IndexManager indexManager;
    private volatile boolean shutdown = false;
    private final IndexDirectoryManager directoryManager;
    private final EventReporter eventReporter;
    private final int commitThreshold;
    private volatile CompletableFuture<Void> shutdownComplete;

    public EventIndexTask(BlockingQueue<StoredDocument> documentQueue, IndexManager indexManager, IndexDirectoryManager directoryManager, int maxEventsPerCommit, EventReporter eventReporter) {
        this.documentQueue = documentQueue;
        this.indexManager = indexManager;
        this.directoryManager = directoryManager;
        this.commitThreshold = maxEventsPerCommit;
        this.eventReporter = eventReporter;
    }

    public synchronized Future<Void> shutdown() {
        if (this.shutdownComplete == null) {
            this.shutdownComplete = new CompletableFuture();
        }
        this.shutdown = true;
        return this.shutdownComplete;
    }

    private void fetchDocuments(List<StoredDocument> destination) throws InterruptedException {
        StoredDocument firstDoc = this.documentQueue.poll(10L, TimeUnit.MILLISECONDS);
        if (firstDoc == null) {
            return;
        }
        destination.add(firstDoc);
        this.documentQueue.drainTo(destination, 99);
    }

    @Override
    public void run() {
        ArrayList<StoredDocument> toIndex = new ArrayList<StoredDocument>(100);
        while (!this.shutdown) {
            try {
                toIndex.clear();
                this.fetchDocuments(toIndex);
                if (toIndex.isEmpty()) continue;
                Map<String, List<StoredDocument>> docsByPartition = toIndex.stream().collect(Collectors.groupingBy(doc -> doc.getStorageSummary().getPartitionName().get()));
                for (Map.Entry<String, List<StoredDocument>> entry : docsByPartition.entrySet()) {
                    String partitionName = entry.getKey();
                    List<StoredDocument> docs = entry.getValue();
                    this.index(docs, partitionName);
                }
            }
            catch (Exception e) {
                logger.error("Failed to index Provenance Events", (Throwable)e);
                this.eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to index Provenance Events. See logs for more information.");
            }
        }
        CompletableFuture<Void> future = this.shutdownComplete;
        if (future != null) {
            future.complete(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void reIndex(List<IndexableDocument> toIndex, CommitPreference commitPreference) throws IOException {
        if (toIndex.isEmpty()) {
            return;
        }
        Map<File, List<IndexableDocument>> docsByIndexDir = toIndex.stream().collect(Collectors.groupingBy(IndexableDocument::getIndexDirectory));
        for (Map.Entry<File, List<IndexableDocument>> entry : docsByIndexDir.entrySet()) {
            File indexDirectory = entry.getKey();
            List<IndexableDocument> documentsForIndex = entry.getValue();
            EventIndexWriter indexWriter = this.indexManager.borrowIndexWriter(indexDirectory);
            try {
                long minId = Long.MAX_VALUE;
                long maxId = Long.MIN_VALUE;
                for (IndexableDocument doc : toIndex) {
                    long eventId = doc.getDocument().getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue();
                    if (eventId < minId) {
                        minId = eventId;
                    }
                    if (eventId <= maxId) continue;
                    maxId = eventId;
                }
                Query query = LongPoint.newRangeQuery((String)SearchableFields.Identifier.getSearchableFieldName(), (long)minId, (long)maxId);
                indexWriter.getIndexWriter().deleteDocuments(new Query[]{query});
                List<Document> documents = documentsForIndex.stream().map(IndexableDocument::getDocument).collect(Collectors.toList());
                indexWriter.index(documents, this.commitThreshold);
            }
            finally {
                this.indexManager.returnIndexWriter(indexWriter, CommitPreference.FORCE_COMMIT.equals((Object)commitPreference), false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void index(List<StoredDocument> toIndex, String partitionName) throws IOException {
        EventIndexWriter indexWriter;
        File indexDirectory;
        if (toIndex.isEmpty()) {
            return;
        }
        List<Document> documents = toIndex.stream().map(StoredDocument::getDocument).collect(Collectors.toList());
        boolean requestClose = false;
        boolean requestCommit = false;
        long minEventTime = toIndex.stream().mapToLong(doc -> doc.getDocument().getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue()).min().getAsLong();
        IndexDirectoryManager indexDirectoryManager = this.directoryManager;
        synchronized (indexDirectoryManager) {
            indexDirectory = this.directoryManager.getWritableIndexingDirectory(minEventTime, partitionName);
            indexWriter = this.indexManager.borrowIndexWriter(indexDirectory);
        }
        try {
            boolean writerIndicatesCommit = indexWriter.index(documents, this.commitThreshold);
            Optional<File> activeIndexDirOption = this.directoryManager.getActiveIndexDirectory(partitionName);
            if (!activeIndexDirOption.isPresent() || !activeIndexDirOption.get().equals(indexDirectory)) {
                requestCommit = true;
                requestClose = true;
            }
            if (writerIndicatesCommit) {
                this.commit(indexWriter);
                requestCommit = false;
                boolean directoryManagerIndicatesClose = this.directoryManager.onIndexCommitted(indexDirectory);
                boolean bl = requestClose = requestClose || directoryManagerIndicatesClose;
                if (logger.isDebugEnabled()) {
                    long maxId = documents.stream().mapToLong(doc -> doc.getField(SearchableFields.Identifier.getSearchableFieldName()).numericValue().longValue()).max().orElse(-1L);
                    logger.debug("Committed index {} after writing a max Event ID of {}", (Object)indexDirectory, (Object)maxId);
                }
            }
        }
        finally {
            this.indexManager.returnIndexWriter(indexWriter, requestCommit, requestClose);
        }
    }

    protected void commit(EventIndexWriter indexWriter) throws IOException {
        long start = System.nanoTime();
        long approximateCommitCount = indexWriter.commit();
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
        logger.debug("Successfully committed approximately {} Events to {} in {} millis", new Object[]{approximateCommitCount, indexWriter, millis});
    }
}

