package org.apache.nifi.provenance.store;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.EventFileCompressor;
import org.apache.nifi.provenance.store.iterator.AggregateEventIterator;
import org.apache.nifi.provenance.store.iterator.EventIterator;

/* loaded from: input_file:org/apache/nifi/provenance/store/PartitionedWriteAheadEventStore.class */
public class PartitionedWriteAheadEventStore extends PartitionedEventStore {
    private final BlockingQueue<File> filesToCompress;
    private final List<WriteAheadStorePartition> partitions;
    private final RepositoryConfiguration repoConfig;
    private final ExecutorService compressionExecutor;
    private final List<EventFileCompressor> fileCompressors;
    private final EventReporter eventReporter;
    private final EventFileManager fileManager;

    public PartitionedWriteAheadEventStore(RepositoryConfiguration repositoryConfiguration, RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory, EventReporter eventReporter, EventFileManager eventFileManager) {
        super(repositoryConfiguration, eventReporter);
        this.fileCompressors = Collections.synchronizedList(new ArrayList());
        this.repoConfig = repositoryConfiguration;
        this.eventReporter = eventReporter;
        this.filesToCompress = new LinkedBlockingQueue(100);
        AtomicLong atomicLong = new AtomicLong(0L);
        this.fileManager = eventFileManager;
        this.partitions = createPartitions(repositoryConfiguration, recordWriterFactory, recordReaderFactory, atomicLong);
        if (repositoryConfiguration.isCompressOnRollover()) {
            this.compressionExecutor = Executors.newFixedThreadPool(repositoryConfiguration.getIndexThreadPoolSize(), new NamedThreadFactory("Compress Provenance Logs"));
        } else {
            this.compressionExecutor = null;
        }
    }

    private List<WriteAheadStorePartition> createPartitions(RepositoryConfiguration repositoryConfiguration, RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory, AtomicLong atomicLong) {
        Map<String, File> storageDirectories = repositoryConfiguration.getStorageDirectories();
        ArrayList arrayList = new ArrayList(storageDirectories.size());
        for (Map.Entry<String, File> entry : storageDirectories.entrySet()) {
            arrayList.add(new WriteAheadStorePartition(entry.getValue(), entry.getKey(), repositoryConfiguration, recordWriterFactory, recordReaderFactory, this.filesToCompress, atomicLong, this.eventReporter, this.fileManager));
        }
        return arrayList;
    }

    @Override // org.apache.nifi.provenance.store.PartitionedEventStore, org.apache.nifi.provenance.store.EventStore
    public void initialize() throws IOException {
        if (this.repoConfig.isCompressOnRollover()) {
            for (int i = 0; i < this.repoConfig.getIndexThreadPoolSize(); i++) {
                EventFileCompressor eventFileCompressor = new EventFileCompressor(this.filesToCompress, this.fileManager);
                this.compressionExecutor.submit(eventFileCompressor);
                this.fileCompressors.add(eventFileCompressor);
            }
        }
        super.initialize();
    }

    @Override // org.apache.nifi.provenance.store.PartitionedEventStore, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        Iterator<EventFileCompressor> it = this.fileCompressors.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        if (this.compressionExecutor != null) {
            this.compressionExecutor.shutdown();
        }
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public void reindexLatestEvents(EventIndex eventIndex) {
        List<WriteAheadStorePartition> partitions = getPartitions();
        int size = partitions.size();
        ArrayList arrayList = new ArrayList(size);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(size);
        for (WriteAheadStorePartition writeAheadStorePartition : partitions) {
            arrayList.add(newFixedThreadPool.submit(() -> {
                writeAheadStorePartition.reindexLatestEvents(eventIndex);
            }));
        }
        newFixedThreadPool.shutdown();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Failed to re-index events because Thread was interrupted", e);
            } catch (ExecutionException e2) {
                throw new RuntimeException("Failed to re-index events", e2);
            }
        }
    }

    @Override // org.apache.nifi.provenance.store.PartitionedEventStore
    protected List<WriteAheadStorePartition> getPartitions() {
        return this.partitions;
    }

    @Override // org.apache.nifi.provenance.store.EventStore
    public EventIterator getEventsByTimestamp(long j, long j2) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator<WriteAheadStorePartition> it = getPartitions().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getEventsByTimestamp(j, j2));
        }
        return new AggregateEventIterator(arrayList);
    }
}
