/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.provenance.store;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.index.EventIndex;
import org.apache.nifi.provenance.serialization.EventFileCompressor;
import org.apache.nifi.provenance.store.EventFileManager;
import org.apache.nifi.provenance.store.PartitionedEventStore;
import org.apache.nifi.provenance.store.RecordReaderFactory;
import org.apache.nifi.provenance.store.RecordWriterFactory;
import org.apache.nifi.provenance.store.WriteAheadStorePartition;
import org.apache.nifi.provenance.store.iterator.AggregateEventIterator;
import org.apache.nifi.provenance.store.iterator.EventIterator;

public class PartitionedWriteAheadEventStore
extends PartitionedEventStore {
    private final BlockingQueue<File> filesToCompress;
    private final List<WriteAheadStorePartition> partitions;
    private final RepositoryConfiguration repoConfig;
    private final ExecutorService compressionExecutor;
    private final List<EventFileCompressor> fileCompressors = Collections.synchronizedList(new ArrayList());
    private final EventReporter eventReporter;
    private final EventFileManager fileManager;

    public PartitionedWriteAheadEventStore(RepositoryConfiguration repoConfig, RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory, EventReporter eventReporter, EventFileManager fileManager) {
        super(repoConfig, eventReporter);
        this.repoConfig = repoConfig;
        this.eventReporter = eventReporter;
        this.filesToCompress = new LinkedBlockingQueue<File>(100);
        AtomicLong idGenerator = new AtomicLong(0L);
        this.fileManager = fileManager;
        this.partitions = this.createPartitions(repoConfig, recordWriterFactory, recordReaderFactory, idGenerator);
        this.compressionExecutor = repoConfig.isCompressOnRollover() ? Executors.newFixedThreadPool(repoConfig.getIndexThreadPoolSize(), (ThreadFactory)new NamedThreadFactory("Compress Provenance Logs")) : null;
    }

    private List<WriteAheadStorePartition> createPartitions(RepositoryConfiguration repoConfig, RecordWriterFactory recordWriterFactory, RecordReaderFactory recordReaderFactory, AtomicLong idGenerator) {
        Map<String, File> storageDirectories = repoConfig.getStorageDirectories();
        ArrayList<WriteAheadStorePartition> partitions = new ArrayList<WriteAheadStorePartition>(storageDirectories.size());
        for (Map.Entry<String, File> entry : storageDirectories.entrySet()) {
            String partitionName = entry.getKey();
            File storageDirectory = entry.getValue();
            partitions.add(new WriteAheadStorePartition(storageDirectory, partitionName, repoConfig, recordWriterFactory, recordReaderFactory, this.filesToCompress, idGenerator, this.eventReporter, this.fileManager));
        }
        return partitions;
    }

    @Override
    public void initialize() throws IOException {
        if (this.repoConfig.isCompressOnRollover()) {
            for (int i = 0; i < this.repoConfig.getIndexThreadPoolSize(); ++i) {
                EventFileCompressor compressor = new EventFileCompressor(this.filesToCompress, this.fileManager);
                this.compressionExecutor.submit(compressor);
                this.fileCompressors.add(compressor);
            }
        }
        super.initialize();
    }

    @Override
    public void close() throws IOException {
        super.close();
        for (EventFileCompressor compressor : this.fileCompressors) {
            compressor.shutdown();
        }
        if (this.compressionExecutor != null) {
            this.compressionExecutor.shutdown();
        }
    }

    @Override
    public void reindexLatestEvents(EventIndex eventIndex) {
        List<WriteAheadStorePartition> partitions = this.getPartitions();
        int numPartitions = partitions.size();
        ArrayList futures = new ArrayList(numPartitions);
        ExecutorService executor = Executors.newFixedThreadPool(numPartitions);
        for (WriteAheadStorePartition writeAheadStorePartition : partitions) {
            futures.add(executor.submit(() -> partition.reindexLatestEvents(eventIndex)));
        }
        executor.shutdown();
        for (Future future : futures) {
            try {
                future.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Failed to re-index events because Thread was interrupted", e);
            }
            catch (ExecutionException e) {
                throw new RuntimeException("Failed to re-index events", e);
            }
        }
    }

    protected List<WriteAheadStorePartition> getPartitions() {
        return this.partitions;
    }

    @Override
    public EventIterator getEventsByTimestamp(long minTimestamp, long maxTimestamp) throws IOException {
        ArrayList<EventIterator> eventIterators = new ArrayList<EventIterator>();
        for (WriteAheadStorePartition partition : this.getPartitions()) {
            EventIterator partitionEventIterator = partition.getEventsByTimestamp(minTimestamp, maxTimestamp);
            eventIterators.add(partitionEventIterator);
        }
        return new AggregateEventIterator(eventIterators);
    }
}

