/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.scheduling;

import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.scheduling.LifecycleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.engine.StatelessSchedulingAgent;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatelessProcessScheduler
implements ProcessScheduler {
    private static final Logger logger = LoggerFactory.getLogger(StatelessProcessScheduler.class);
    private static final int ADMINISTRATIVE_YIELD_MILLIS = 1000;
    private static final int PROCESSOR_START_TIMEOUT_MILLIS = 10000;
    private final SchedulingAgent schedulingAgent;
    private final ExtensionManager extensionManager;
    private FlowEngine componentLifeCycleThreadPool;
    private ScheduledExecutorService componentMonitoringThreadPool;
    private ProcessContextFactory processContextFactory;

    public StatelessProcessScheduler(ExtensionManager extensionManager) {
        this.extensionManager = extensionManager;
        this.schedulingAgent = new StatelessSchedulingAgent(extensionManager);
    }

    public void shutdown() {
        if (this.componentLifeCycleThreadPool != null) {
            this.componentLifeCycleThreadPool.shutdown();
        }
        if (this.componentMonitoringThreadPool != null) {
            this.componentMonitoringThreadPool.shutdown();
        }
    }

    public void shutdownControllerService(ControllerServiceNode serviceNode, ControllerServiceProvider controllerServiceProvider) {
        Class<?> serviceImplClass = serviceNode.getControllerServiceImplementation().getClass();
        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, serviceImplClass, (String)serviceNode.getIdentifier());){
            StandardConfigurationContext configContext = new StandardConfigurationContext((ComponentNode)serviceNode, (ControllerServiceLookup)controllerServiceProvider, null, VariableRegistry.EMPTY_REGISTRY);
            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, (Object)serviceNode.getControllerServiceImplementation(), (Object[])new Object[]{configContext});
        }
    }

    public void shutdownReportingTask(ReportingTaskNode taskNode) {
        ConfigurationContext configContext = taskNode.getConfigurationContext();
        try (NarCloseable narCloseable = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, taskNode.getReportingTask().getClass(), (String)taskNode.getIdentifier());){
            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, (Object)taskNode.getReportingTask(), (Object[])new Object[]{configContext});
        }
    }

    public void initialize(ProcessContextFactory processContextFactory, DataflowDefinition dataflowDefinition) {
        this.processContextFactory = processContextFactory;
        String threadNameSuffix = dataflowDefinition.getFlowName() == null ? "" : " for dataflow " + dataflowDefinition.getFlowName();
        this.componentLifeCycleThreadPool = new FlowEngine(8, "Component Lifecycle" + threadNameSuffix, true);
        this.componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle" + threadNameSuffix, true);
    }

    public Future<Void> startProcessor(ProcessorNode procNode, boolean failIfStopping) {
        final CompletableFuture<Void> future = new CompletableFuture<Void>();
        SchedulingAgentCallback callback = new SchedulingAgentCallback(){

            public void trigger() {
                future.complete(null);
            }

            public Future<?> scheduleTask(Callable<?> task) {
                return StatelessProcessScheduler.this.componentLifeCycleThreadPool.submit(task);
            }

            public void onTaskComplete() {
            }
        };
        logger.info("Starting {}", (Object)procNode);
        Supplier<ProcessContext> processContextSupplier = () -> this.processContextFactory.createProcessContext((Connectable)procNode);
        procNode.start(this.componentMonitoringThreadPool, 1000L, 10000L, processContextSupplier, callback, failIfStopping);
        return future;
    }

    public Future<Void> runProcessorOnce(ProcessorNode procNode, Callable<Future<Void>> stopCallback) {
        throw new UnsupportedOperationException();
    }

    public Future<Void> stopProcessor(ProcessorNode procNode) {
        logger.info("Stopping {}", (Object)procNode);
        ProcessContext processContext = this.processContextFactory.createProcessContext((Connectable)procNode);
        LifecycleState lifecycleState = new LifecycleState();
        boolean scheduled = procNode.getScheduledState() == ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
        lifecycleState.setScheduled(scheduled);
        return procNode.stop((ProcessScheduler)this, (ScheduledExecutorService)this.componentLifeCycleThreadPool, processContext, this.schedulingAgent, lifecycleState);
    }

    public void terminateProcessor(ProcessorNode procNode) {
    }

    public void onProcessorRemoved(ProcessorNode procNode) {
    }

    public void onPortRemoved(Port port) {
    }

    public void onFunnelRemoved(Funnel funnel) {
    }

    public void onReportingTaskRemoved(ReportingTaskNode reportingTask) {
    }

    public void startPort(Port port) {
        if (!port.isValid()) {
            throw new IllegalStateException("Port " + port.getIdentifier() + " is not in a valid state");
        }
        port.onSchedulingStart();
    }

    public void stopPort(Port port) {
    }

    public void startFunnel(Funnel funnel) {
    }

    public void stopFunnel(Funnel funnel) {
    }

    public void enableFunnel(Funnel funnel) {
    }

    public void enablePort(Port port) {
    }

    public void enableProcessor(ProcessorNode procNode) {
        procNode.enable();
    }

    public void disableFunnel(Funnel funnel) {
    }

    public void disablePort(Port port) {
    }

    public void disableProcessor(ProcessorNode procNode) {
        procNode.disable();
    }

    public int getActiveThreadCount(Object scheduled) {
        return 0;
    }

    public boolean isScheduled(Object scheduled) {
        return false;
    }

    public void registerEvent(Connectable worker) {
    }

    public void setMaxThreadCount(SchedulingStrategy strategy, int maxThreadCount) {
    }

    public void yield(ProcessorNode procNode) {
    }

    public Future<Void> unschedule(ReportingTaskNode taskNode) {
        return CompletableFuture.completedFuture(null);
    }

    public void schedule(final ReportingTaskNode taskNode) {
        Runnable scheduleTask = new Runnable(){

            @Override
            public void run() {
                try {
                    StatelessProcessScheduler.this.attemptSchedule(taskNode);
                    StatelessProcessScheduler.this.schedulingAgent.schedule(taskNode, new LifecycleState());
                    logger.info("Successfully scheduled {} to run every {}", (Object)taskNode, (Object)taskNode.getSchedulingPeriod());
                }
                catch (Exception e) {
                    logger.error("Could not schedule {} to run. Will try again in 30 seconds.", (Object)taskNode, (Object)e);
                    StatelessProcessScheduler.this.componentLifeCycleThreadPool.schedule((Runnable)this, 30L, TimeUnit.SECONDS);
                }
            }
        };
        this.componentLifeCycleThreadPool.submit(scheduleTask);
    }

    private void attemptSchedule(ReportingTaskNode taskNode) throws InvocationTargetException, IllegalAccessException {
        ValidationStatus validation = taskNode.performValidation();
        if (validation != ValidationStatus.VALID) {
            throw new IllegalStateException("Cannot start Reporting Task " + taskNode + " because it is not valid: " + taskNode.getValidationErrors());
        }
        ReportingTask reportingTask = taskNode.getReportingTask();
        try (NarCloseable x = NarCloseable.withComponentNarLoader((ExtensionManager)this.extensionManager, reportingTask.getClass(), (String)taskNode.getIdentifier());){
            ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, (Object)reportingTask, (Object[])new Object[]{taskNode.getConfigurationContext()});
        }
    }

    public CompletableFuture<Void> enableControllerService(ControllerServiceNode service) {
        logger.info("Enabling {}", (Object)service);
        return service.enable((ScheduledExecutorService)this.componentLifeCycleThreadPool, 1000L);
    }

    public CompletableFuture<Void> disableControllerService(ControllerServiceNode service) {
        logger.info("Disabling {}", (Object)service);
        return service.disable((ScheduledExecutorService)this.componentLifeCycleThreadPool);
    }

    public CompletableFuture<Void> disableControllerServices(List<ControllerServiceNode> services) {
        if (services == null || services.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = null;
        for (ControllerServiceNode controllerServiceNode : services) {
            CompletableFuture<Void> serviceFuture = this.disableControllerService(controllerServiceNode);
            if (future == null) {
                future = serviceFuture;
                continue;
            }
            future = CompletableFuture.allOf(future, serviceFuture);
        }
        return future;
    }

    public Future<?> submitFrameworkTask(Runnable task) {
        return null;
    }
}

