package org.apache.nifi.controller.status.history;

import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.DefaultCairoConfiguration;
import java.nio.file.Path;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
import org.apache.nifi.controller.status.history.questdb.QuestDbDatabaseManager;
import org.apache.nifi.controller.status.history.storage.BufferedWriterFlushWorker;
import org.apache.nifi.controller.status.history.storage.BufferedWriterForStatusStorage;
import org.apache.nifi.controller.status.history.storage.ComponentStatusStorage;
import org.apache.nifi.controller.status.history.storage.GarbageCollectionStatusStorage;
import org.apache.nifi.controller.status.history.storage.NodeStatusStorage;
import org.apache.nifi.controller.status.history.storage.ProcessorStatusStorage;
import org.apache.nifi.controller.status.history.storage.questdb.QuestDbConnectionStatusStorage;
import org.apache.nifi.controller.status.history.storage.questdb.QuestDbGarbageCollectionStatusStorage;
import org.apache.nifi.controller.status.history.storage.questdb.QuestDbNodeStatusStorage;
import org.apache.nifi.controller.status.history.storage.questdb.QuestDbProcessGroupStatusStorage;
import org.apache.nifi.controller.status.history.storage.questdb.QuestDbProcessorStatusStorage;
import org.apache.nifi.controller.status.history.storage.questdb.QuestDbRemoteProcessGroupStatusStorage;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.class */
public class EmbeddedQuestDbStatusHistoryRepository implements StatusHistoryRepository {
    private static final int PERSIST_BATCH_SIZE = 1000;
    private final InMemoryComponentDetailsStorage componentDetailsProvider;
    private final ScheduledExecutorService scheduledExecutorService;
    private final QuestDbContext dbContext;
    private final long persistFrequency;
    private final int daysToKeepNodeData;
    private final int daysToKeepComponentData;
    private final ProcessorStatusStorage processorStatusStorage;
    private final ComponentStatusStorage<ConnectionStatus> connectionStatusStorage;
    private final ComponentStatusStorage<ProcessGroupStatus> processGroupStatusStorage;
    private final ComponentStatusStorage<RemoteProcessGroupStatus> remoteProcessGroupStatusStorage;
    private final NodeStatusStorage nodeStatusStorage;
    private final GarbageCollectionStatusStorage garbageCollectionStatusStorage;
    private final BufferedWriterForStatusStorage<ProcessorStatus> processorStatusWriter;
    private final BufferedWriterForStatusStorage<ConnectionStatus> connectionStatusWriter;
    private final BufferedWriterForStatusStorage<ProcessGroupStatus> processGroupStatusWriter;
    private final BufferedWriterForStatusStorage<RemoteProcessGroupStatus> remoteProcessGroupStatusWriter;
    private final BufferedWriterForStatusStorage<NodeStatus> nodeStatusWriter;
    private final BufferedWriterForStatusStorage<GarbageCollectionStatus> garbageCollectionStatusWriter;
    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbStatusHistoryRepository.class);
    private static final long PERSIST_FREQUENCY = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
    private static final long ROLL_FREQUENCY = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);

    public EmbeddedQuestDbStatusHistoryRepository() {
        this.componentDetailsProvider = new InMemoryComponentDetailsStorage();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(3, new BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbStatusHistoryRepositoryWorker-%d").build());
        this.dbContext = null;
        this.persistFrequency = PERSIST_FREQUENCY;
        this.daysToKeepNodeData = -1;
        this.daysToKeepComponentData = -1;
        this.processorStatusStorage = null;
        this.connectionStatusStorage = null;
        this.processGroupStatusStorage = null;
        this.remoteProcessGroupStatusStorage = null;
        this.nodeStatusStorage = null;
        this.garbageCollectionStatusStorage = null;
        this.processorStatusWriter = null;
        this.connectionStatusWriter = null;
        this.processGroupStatusWriter = null;
        this.remoteProcessGroupStatusWriter = null;
        this.nodeStatusWriter = null;
        this.garbageCollectionStatusWriter = null;
    }

    public EmbeddedQuestDbStatusHistoryRepository(NiFiProperties niFiProperties) {
        this(niFiProperties, PERSIST_FREQUENCY);
    }

    EmbeddedQuestDbStatusHistoryRepository(NiFiProperties niFiProperties, long j) {
        this.componentDetailsProvider = new InMemoryComponentDetailsStorage();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(3, new BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbStatusHistoryRepositoryWorker-%d").build());
        Path questDbStatusRepositoryPath = niFiProperties.getQuestDbStatusRepositoryPath();
        DefaultCairoConfiguration defaultCairoConfiguration = new DefaultCairoConfiguration(questDbStatusRepositoryPath.toString());
        QuestDbDatabaseManager.checkDatabaseStatus(questDbStatusRepositoryPath);
        this.persistFrequency = j;
        this.daysToKeepNodeData = getDaysToKeepNodeData(niFiProperties).intValue();
        this.daysToKeepComponentData = getDaysToKeepComponentData(niFiProperties).intValue();
        this.dbContext = new QuestDbContext(new CairoEngine(defaultCairoConfiguration));
        this.nodeStatusStorage = new QuestDbNodeStatusStorage(this.dbContext);
        this.garbageCollectionStatusStorage = new QuestDbGarbageCollectionStatusStorage(this.dbContext);
        this.processorStatusStorage = new QuestDbProcessorStatusStorage(this.dbContext, this.componentDetailsProvider);
        this.connectionStatusStorage = new QuestDbConnectionStatusStorage(this.dbContext, this.componentDetailsProvider);
        this.processGroupStatusStorage = new QuestDbProcessGroupStatusStorage(this.dbContext, this.componentDetailsProvider);
        this.remoteProcessGroupStatusStorage = new QuestDbRemoteProcessGroupStatusStorage(this.dbContext, this.componentDetailsProvider);
        this.nodeStatusWriter = new BufferedWriterForStatusStorage<>(this.nodeStatusStorage, PERSIST_BATCH_SIZE);
        this.garbageCollectionStatusWriter = new BufferedWriterForStatusStorage<>(this.garbageCollectionStatusStorage, PERSIST_BATCH_SIZE);
        this.processorStatusWriter = new BufferedWriterForStatusStorage<>(this.processorStatusStorage, PERSIST_BATCH_SIZE);
        this.connectionStatusWriter = new BufferedWriterForStatusStorage<>(this.connectionStatusStorage, PERSIST_BATCH_SIZE);
        this.processGroupStatusWriter = new BufferedWriterForStatusStorage<>(this.processGroupStatusStorage, PERSIST_BATCH_SIZE);
        this.remoteProcessGroupStatusWriter = new BufferedWriterForStatusStorage<>(this.remoteProcessGroupStatusStorage, PERSIST_BATCH_SIZE);
    }

    public void start() {
        LOGGER.debug("Starting status history repository");
        EmbeddedQuestDbRolloverHandler embeddedQuestDbRolloverHandler = new EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getNodeTableNames(), this.daysToKeepNodeData, this.dbContext);
        EmbeddedQuestDbRolloverHandler embeddedQuestDbRolloverHandler2 = new EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getComponentTableNames(), this.daysToKeepComponentData, this.dbContext);
        BufferedWriterFlushWorker bufferedWriterFlushWorker = new BufferedWriterFlushWorker(Arrays.asList(this.nodeStatusWriter, this.garbageCollectionStatusWriter, this.processorStatusWriter, this.connectionStatusWriter, this.processGroupStatusWriter, this.remoteProcessGroupStatusWriter));
        this.scheduledExecutorService.scheduleWithFixedDelay(embeddedQuestDbRolloverHandler, 0L, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleWithFixedDelay(embeddedQuestDbRolloverHandler2, 0L, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleWithFixedDelay(bufferedWriterFlushWorker, 0L, this.persistFrequency, TimeUnit.MILLISECONDS);
        LOGGER.debug("Status history repository is started");
    }

    public void shutdown() {
        LOGGER.debug("Status history repository started to shut down");
        this.scheduledExecutorService.shutdown();
        this.dbContext.close();
        LOGGER.debug("Status history repository has been shut down");
    }

    public void capture(NodeStatus nodeStatus, ProcessGroupStatus processGroupStatus, List<GarbageCollectionStatus> list, Date date) {
        captureNodeLevelStatus(nodeStatus, list, date.toInstant());
        captureComponentLevelStatus(processGroupStatus, date.toInstant());
    }

    private void captureComponentLevelStatus(ProcessGroupStatus processGroupStatus, Instant instant) {
        captureComponents(processGroupStatus, instant);
        updateComponentDetails(processGroupStatus);
    }

    private void captureNodeLevelStatus(NodeStatus nodeStatus, List<GarbageCollectionStatus> list, Instant instant) {
        this.nodeStatusWriter.collect((Pair<Instant, NodeStatus>) new ImmutablePair(instant, nodeStatus));
        list.forEach(garbageCollectionStatus -> {
            this.garbageCollectionStatusWriter.collect((Pair<Instant, GarbageCollectionStatus>) new ImmutablePair(instant, garbageCollectionStatus));
        });
    }

    private void captureComponents(ProcessGroupStatus processGroupStatus, Instant instant) {
        this.processGroupStatusWriter.collect((Pair<Instant, ProcessGroupStatus>) new ImmutablePair(instant, processGroupStatus));
        processGroupStatus.getConnectionStatus().forEach(connectionStatus -> {
            this.connectionStatusWriter.collect((Pair<Instant, ConnectionStatus>) new ImmutablePair(instant, connectionStatus));
        });
        processGroupStatus.getRemoteProcessGroupStatus().forEach(remoteProcessGroupStatus -> {
            this.remoteProcessGroupStatusWriter.collect((Pair<Instant, RemoteProcessGroupStatus>) new ImmutablePair(instant, remoteProcessGroupStatus));
        });
        processGroupStatus.getProcessorStatus().forEach(processorStatus -> {
            this.processorStatusWriter.collect((Pair<Instant, ProcessorStatus>) new ImmutablePair(instant, processorStatus));
        });
        processGroupStatus.getProcessGroupStatus().forEach(processGroupStatus2 -> {
            captureComponents(processGroupStatus2, instant);
        });
    }

    private void updateComponentDetails(ProcessGroupStatus processGroupStatus) {
        HashMap hashMap = new HashMap();
        updateComponentDetails(processGroupStatus, hashMap);
        this.componentDetailsProvider.setComponentDetails(hashMap);
    }

    private void updateComponentDetails(ProcessGroupStatus processGroupStatus, Map<String, ComponentDetails> map) {
        map.put(processGroupStatus.getId(), ComponentDetails.forProcessGroup(processGroupStatus));
        processGroupStatus.getConnectionStatus().forEach(connectionStatus -> {
            map.put(connectionStatus.getId(), ComponentDetails.forConnection(connectionStatus));
        });
        processGroupStatus.getRemoteProcessGroupStatus().forEach(remoteProcessGroupStatus -> {
            map.put(remoteProcessGroupStatus.getId(), ComponentDetails.forRemoteProcessGroup(remoteProcessGroupStatus));
        });
        processGroupStatus.getProcessorStatus().forEach(processorStatus -> {
            map.put(processorStatus.getId(), ComponentDetails.forProcessor(processorStatus));
        });
        processGroupStatus.getProcessGroupStatus().forEach(processGroupStatus2 -> {
            updateComponentDetails(processGroupStatus2, map);
        });
    }

    public StatusHistory getConnectionStatusHistory(String str, Date date, Date date2, int i) {
        return this.connectionStatusStorage.read(str, getStartTime(date), getEndTime(date2), i);
    }

    public StatusHistory getProcessGroupStatusHistory(String str, Date date, Date date2, int i) {
        return this.processGroupStatusStorage.read(str, getStartTime(date), getEndTime(date2), i);
    }

    public StatusHistory getProcessorStatusHistory(String str, Date date, Date date2, int i, boolean z) {
        return z ? this.processorStatusStorage.readWithCounter(str, getStartTime(date), getEndTime(date2), i) : this.processorStatusStorage.read(str, getStartTime(date), getEndTime(date2), i);
    }

    public StatusHistory getRemoteProcessGroupStatusHistory(String str, Date date, Date date2, int i) {
        return this.remoteProcessGroupStatusStorage.read(str, getStartTime(date), getEndTime(date2), i);
    }

    public GarbageCollectionHistory getGarbageCollectionHistory(Date date, Date date2) {
        return this.garbageCollectionStatusStorage.read(getStartTime(date), getEndTime(date2));
    }

    public StatusHistory getNodeStatusHistory(Date date, Date date2) {
        return this.nodeStatusStorage.read(getStartTime(date), getEndTime(date2));
    }

    private Integer getDaysToKeepNodeData(NiFiProperties niFiProperties) {
        return niFiProperties.getIntegerProperty("nifi.status.repository.questdb.persist.node.days", 14);
    }

    private Integer getDaysToKeepComponentData(NiFiProperties niFiProperties) {
        return niFiProperties.getIntegerProperty("nifi.status.repository.questdb.persist.component.days", 3);
    }

    private Instant getStartTime(Date date) {
        return date == null ? Instant.now().minus(1L, (TemporalUnit) ChronoUnit.DAYS) : date.toInstant();
    }

    private Instant getEndTime(Date date) {
        return date == null ? Instant.now() : date.toInstant();
    }
}
