/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.stateless.flow;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.LocalPort;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.NopPerformanceTracker;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.TerminatedTaskException;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.engine.StandardExecutionProgress;
import org.apache.nifi.stateless.flow.CanceledTriggerResult;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.DataflowTriggerContext;
import org.apache.nifi.stateless.flow.ExceptionalTriggerResult;
import org.apache.nifi.stateless.flow.StandardStatelessDataflowValidation;
import org.apache.nifi.stateless.flow.StandardStatelessFlowCurrent;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.StatelessDataflowValidation;
import org.apache.nifi.stateless.flow.TransactionThresholdMeter;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.Connectables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardStatelessFlow
implements StatelessDataflow {
    private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlow.class);
    private static final long COMPONENT_ENABLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10L);
    private static final long TEN_MILLIS_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
    private static final String PARENT_FLOW_GROUP_ID = "stateless-flow";
    private final ProcessGroup rootGroup;
    private final List<Connection> allConnections;
    private final List<ReportingTaskNode> reportingTasks;
    private final Set<Connectable> rootConnectables;
    private final ControllerServiceProvider controllerServiceProvider;
    private final ProcessContextFactory processContextFactory;
    private final RepositoryContextFactory repositoryContextFactory;
    private final List<FlowFileQueue> internalFlowFileQueues;
    private final DataflowDefinition dataflowDefinition;
    private final StatelessStateManagerProvider stateManagerProvider;
    private final ObjectMapper objectMapper = new ObjectMapper();
    private final ProcessScheduler processScheduler;
    private final AsynchronousCommitTracker tracker = new AsynchronousCommitTracker();
    private final TransactionThresholdMeter transactionThresholdMeter;
    private final List<BackgroundTask> backgroundTasks = new ArrayList<BackgroundTask>();
    private final BulletinRepository bulletinRepository;
    private volatile ExecutorService runDataflowExecutor;
    private volatile ScheduledExecutorService backgroundTaskExecutor;
    private volatile boolean initialized = false;
    private volatile Boolean stateful = null;

    public StandardStatelessFlow(ProcessGroup rootGroup, List<ReportingTaskNode> reportingTasks, ControllerServiceProvider controllerServiceProvider, ProcessContextFactory processContextFactory, RepositoryContextFactory repositoryContextFactory, DataflowDefinition dataflowDefinition, StatelessStateManagerProvider stateManagerProvider, ProcessScheduler processScheduler, BulletinRepository bulletinRepository) {
        this.rootGroup = rootGroup;
        this.allConnections = rootGroup.findAllConnections();
        this.reportingTasks = reportingTasks;
        this.controllerServiceProvider = controllerServiceProvider;
        this.processContextFactory = processContextFactory;
        this.repositoryContextFactory = repositoryContextFactory;
        this.dataflowDefinition = dataflowDefinition;
        this.stateManagerProvider = stateManagerProvider;
        this.processScheduler = processScheduler;
        this.transactionThresholdMeter = new TransactionThresholdMeter(dataflowDefinition.getTransactionThresholds());
        this.bulletinRepository = bulletinRepository;
        this.rootConnectables = new HashSet<Connectable>();
        this.discoverRootProcessors(rootGroup, this.rootConnectables);
        this.discoverRootRemoteGroupPorts(rootGroup, this.rootConnectables);
        this.discoverRootInputPorts(rootGroup, this.rootConnectables);
        this.internalFlowFileQueues = this.discoverInternalFlowFileQueues(rootGroup);
    }

    private List<FlowFileQueue> discoverInternalFlowFileQueues(ProcessGroup group) {
        Set rootGroupInputPorts = this.rootGroup.getInputPorts();
        Set rootGroupOutputPorts = this.rootGroup.getOutputPorts();
        return group.findAllConnections().stream().filter(connection -> !rootGroupInputPorts.contains(connection.getSource())).filter(connection -> !rootGroupOutputPorts.contains(connection.getDestination())).map(Connection::getFlowFileQueue).distinct().collect(Collectors.toCollection(ArrayList::new));
    }

    private void discoverRootInputPorts(ProcessGroup processGroup, Set<Connectable> rootComponents) {
        for (Port port : processGroup.getInputPorts()) {
            for (Connection connection : port.getConnections()) {
                Connectable connectable = connection.getDestination();
                if (StandardStatelessFlow.isTerminalPort(connectable)) continue;
                rootComponents.add(connectable);
            }
        }
    }

    private void discoverRootProcessors(ProcessGroup processGroup, Set<Connectable> rootComponents) {
        for (ProcessorNode processor : processGroup.findAllProcessors()) {
            if (Connectables.hasNonLoopConnection((Connectable)processor)) continue;
            rootComponents.add((Connectable)processor);
        }
    }

    private void discoverRootRemoteGroupPorts(ProcessGroup processGroup, Set<Connectable> rootComponents) {
        List rpgs = processGroup.findAllRemoteProcessGroups();
        for (RemoteProcessGroup rpg : rpgs) {
            Set remoteGroupPorts = rpg.getOutputPorts();
            for (RemoteGroupPort remoteGroupPort : remoteGroupPorts) {
                if (remoteGroupPort.getConnections().isEmpty()) continue;
                rootComponents.add((Connectable)remoteGroupPort);
            }
        }
    }

    public static boolean isTerminalPort(Connectable connectable) {
        ConnectableType connectableType = connectable.getConnectableType();
        if (connectableType != ConnectableType.OUTPUT_PORT) {
            return false;
        }
        ProcessGroup portGroup = connectable.getProcessGroup();
        if (PARENT_FLOW_GROUP_ID.equals(portGroup.getIdentifier())) {
            logger.debug("FlowFiles queued for {} but this is a Terminal Port. Will not trigger Port to run.", (Object)connectable);
            return true;
        }
        return false;
    }

    public void initialize() {
        if (this.initialized) {
            logger.debug("{} initialize() was called, but dataflow has already been initialized. Returning without doing anything.", (Object)this);
            return;
        }
        this.initialized = true;
        this.performValidation();
        try {
            long serviceEnableStart = System.currentTimeMillis();
            this.enableControllerServices(this.rootGroup);
            this.waitForServicesEnabled(this.rootGroup);
            long serviceEnableMillis = System.currentTimeMillis() - serviceEnableStart;
            long validationStart = System.currentTimeMillis();
            StatelessDataflowValidation validationResult = this.performValidation();
            long validationMillis = System.currentTimeMillis() - validationStart;
            if (!validationResult.isValid()) {
                logger.warn("{} Attempting to initialize dataflow but found at least one invalid component: {}", (Object)this, (Object)validationResult);
            }
            this.startProcessors(this.rootGroup);
            this.startRemoteGroups(this.rootGroup);
            this.startReportingTasks();
            long initializationMillis = System.currentTimeMillis() - validationStart;
            logger.info("Successfully initialized components in {} millis ({} millis to perform validation, {} millis for services to enable)", new Object[]{initializationMillis, validationMillis, serviceEnableMillis});
            String flowName = this.dataflowDefinition.getFlowName();
            String threadName = flowName == null || flowName.trim().isEmpty() ? "Run Dataflow" : "Run Dataflow " + flowName;
            this.runDataflowExecutor = Executors.newFixedThreadPool(1, this.createNamedThreadFactory(threadName, false));
            this.backgroundTaskExecutor = Executors.newScheduledThreadPool(1, this.createNamedThreadFactory("Background Tasks", true));
            this.backgroundTasks.forEach(task -> this.backgroundTaskExecutor.scheduleWithFixedDelay(task.getTask(), task.getSchedulingPeriod(), task.getSchedulingPeriod(), task.getSchedulingUnit()));
        }
        catch (Throwable t) {
            this.processScheduler.shutdown();
            throw t;
        }
    }

    private ThreadFactory createNamedThreadFactory(String name, boolean daemon) {
        return r -> {
            Thread thread = Executors.defaultThreadFactory().newThread(r);
            thread.setName(name);
            thread.setDaemon(daemon);
            return thread;
        };
    }

    public void scheduleBackgroundTask(Runnable task, long period, TimeUnit unit) {
        this.backgroundTasks.add(new BackgroundTask(task, period, unit));
    }

    private void waitForServicesEnabled(ProcessGroup group) {
        long startTime = System.currentTimeMillis();
        long cutoff = startTime + COMPONENT_ENABLE_TIMEOUT_MILLIS;
        Set serviceNodes = group.findAllControllerServices();
        for (ControllerServiceNode serviceNode : serviceNodes) {
            boolean enabled;
            try {
                enabled = serviceNode.awaitEnabled(cutoff - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting for Controller Services to enable", ie);
            }
            if (enabled || System.currentTimeMillis() <= cutoff) continue;
            String validationErrors = this.performValidation().toString();
            throw new IllegalStateException("At least one Controller Service never finished enabling. All validation errors: " + validationErrors);
        }
    }

    private void startReportingTasks() {
        this.reportingTasks.forEach(this::startReportingTask);
    }

    private void startReportingTask(ReportingTaskNode taskNode) {
        this.processScheduler.schedule(taskNode);
    }

    public void shutdown() {
        if (this.runDataflowExecutor != null) {
            this.runDataflowExecutor.shutdown();
        }
        if (this.backgroundTaskExecutor != null) {
            this.backgroundTaskExecutor.shutdown();
        }
        this.rootGroup.stopProcessing();
        this.rootGroup.findAllRemoteProcessGroups().forEach(RemoteProcessGroup::shutdown);
        this.rootGroup.shutdown();
        Set allControllerServices = this.rootGroup.findAllControllerServices();
        this.controllerServiceProvider.disableControllerServicesAsync((Collection)allControllerServices);
        this.reportingTasks.forEach(arg_0 -> ((ProcessScheduler)this.processScheduler).unschedule(arg_0));
        this.stateManagerProvider.shutdown();
        allControllerServices.forEach(cs -> this.processScheduler.shutdownControllerService(cs, this.controllerServiceProvider));
        this.reportingTasks.forEach(arg_0 -> ((ProcessScheduler)this.processScheduler).shutdownReportingTask(arg_0));
        this.processScheduler.shutdown();
        this.repositoryContextFactory.shutdown();
    }

    public StatelessDataflowValidation performValidation() {
        HashMap<ComponentNode, List<ValidationResult>> resultsMap = new HashMap<ComponentNode, List<ValidationResult>>();
        for (ControllerServiceNode serviceNode : this.rootGroup.findAllControllerServices()) {
            this.performValidation((ComponentNode)serviceNode, resultsMap);
        }
        for (ProcessorNode procNode : this.rootGroup.findAllProcessors()) {
            this.performValidation((ComponentNode)procNode, resultsMap);
        }
        return new StandardStatelessDataflowValidation(resultsMap);
    }

    private void performValidation(ComponentNode componentNode, Map<ComponentNode, List<ValidationResult>> resultsMap) {
        ValidationStatus validationStatus = componentNode.performValidation();
        if (validationStatus == ValidationStatus.VALID) {
            return;
        }
        Collection validationResults = componentNode.getValidationErrors();
        ArrayList<ValidationResult> invalidResults = new ArrayList<ValidationResult>();
        for (ValidationResult result : validationResults) {
            if (result.isValid()) continue;
            invalidResults.add(result);
        }
        resultsMap.put(componentNode, invalidResults);
    }

    private void enableControllerServices(ProcessGroup processGroup) {
        Set services = processGroup.getControllerServices(false);
        for (ControllerServiceNode serviceNode : services) {
            Future future = this.controllerServiceProvider.enableControllerServiceAndDependencies(serviceNode);
            try {
                future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                throw new IllegalStateException("Controller Service " + serviceNode + " has not fully enabled. Current Validation Status is " + serviceNode.getValidationStatus() + " with validation Errors: " + serviceNode.getValidationErrors(), e);
            }
        }
        processGroup.getProcessGroups().forEach(this::enableControllerServices);
    }

    private void startProcessors(ProcessGroup processGroup) {
        Collection processors = processGroup.getProcessors();
        HashMap<ProcessorNode, Future> futures = new HashMap<ProcessorNode, Future>(processors.size());
        for (ProcessorNode processorNode : processors) {
            Future future = processGroup.startProcessor(processorNode, true);
            futures.put(processorNode, future);
        }
        for (Map.Entry entry : futures.entrySet()) {
            ProcessorNode processor = (ProcessorNode)entry.getKey();
            Future future = (Future)entry.getValue();
            long start = System.currentTimeMillis();
            try {
                future.get(COMPONENT_ENABLE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            }
            catch (Exception e) {
                String validationErrors = this.performValidation().toString();
                throw new IllegalStateException("Processor " + processor + " has not fully enabled. Current Validation Status is " + processor.getValidationStatus() + ". All validation errors: " + validationErrors);
            }
            long millis = System.currentTimeMillis() - start;
            logger.debug("Waited {} millis for {} to start", (Object)millis, (Object)processor);
        }
        processGroup.getProcessGroups().forEach(this::startProcessors);
    }

    private void startRemoteGroups(ProcessGroup processGroup) {
        List rpgs = processGroup.findAllRemoteProcessGroups();
        rpgs.forEach(RemoteProcessGroup::initialize);
        rpgs.forEach(RemoteProcessGroup::startTransmitting);
    }

    public DataflowTrigger trigger(DataflowTriggerContext triggerContext) {
        if (!this.initialized) {
            throw new IllegalStateException("Must initialize dataflow before triggering it");
        }
        final LinkedBlockingQueue<TriggerResult> resultQueue = new LinkedBlockingQueue<TriggerResult>();
        final StandardExecutionProgress executionProgress = new StandardExecutionProgress(this.rootGroup, this.internalFlowFileQueues, resultQueue, this.repositoryContextFactory, this.dataflowDefinition.getFailurePortNames(), this.tracker, this.stateManagerProvider, triggerContext, this::purge);
        final AtomicReference processFuture = new AtomicReference();
        DataflowTrigger trigger = new DataflowTrigger(){

            public void cancel() {
                executionProgress.notifyExecutionCanceled();
                Future future = (Future)processFuture.get();
                if (future != null) {
                    future.cancel(true);
                }
            }

            public Optional<TriggerResult> getResultNow() {
                TriggerResult result = (TriggerResult)resultQueue.poll();
                return Optional.ofNullable(result);
            }

            public Optional<TriggerResult> getResult(long maxWaitTime, TimeUnit timeUnit) throws InterruptedException {
                TriggerResult result = (TriggerResult)resultQueue.poll(maxWaitTime, timeUnit);
                return Optional.ofNullable(result);
            }

            public TriggerResult getResult() throws InterruptedException {
                TriggerResult result = (TriggerResult)resultQueue.take();
                return result;
            }
        };
        Future<?> future = this.runDataflowExecutor.submit(() -> this.executeDataflow(resultQueue, executionProgress, this.tracker));
        processFuture.set(future);
        return trigger;
    }

    private void executeDataflow(BlockingQueue<TriggerResult> resultQueue, ExecutionProgress executionProgress, AsynchronousCommitTracker tracker) {
        long startNanos = System.nanoTime();
        this.transactionThresholdMeter.reset();
        StandardStatelessFlowCurrent current = new StandardStatelessFlowCurrent.Builder().commitTracker(tracker).executionProgress(executionProgress).processContextFactory(this.processContextFactory).repositoryContextFactory(this.repositoryContextFactory).rootConnectables(this.rootConnectables).transactionThresholdMeter(this.transactionThresholdMeter).build();
        try {
            current.triggerFlow();
            logger.debug("Completed triggering of components in dataflow. Will now wait for acknowledgment");
            ExecutionProgress.CompletionAction completionAction = executionProgress.awaitCompletionAction();
            switch (completionAction) {
                case CANCEL: {
                    logger.debug("Dataflow was canceled");
                    this.purge();
                    break;
                }
                default: {
                    if (logger.isDebugEnabled()) {
                        long nanos = System.nanoTime() - startNanos;
                        String prettyPrinted = nanos > TEN_MILLIS_IN_NANOS ? TimeUnit.NANOSECONDS.toMillis(nanos) + " millis" : NumberFormat.getInstance().format(nanos) + " nanos";
                        logger.debug("Ran dataflow in {}", (Object)prettyPrinted);
                    }
                    break;
                }
            }
        }
        catch (TerminatedTaskException tte) {
            logger.debug("Caught a TerminatedTaskException", (Throwable)tte);
            this.purge();
            tracker.triggerFailureCallbacks(tte);
            this.stateManagerProvider.rollbackUpdates();
            resultQueue.offer(new CanceledTriggerResult());
        }
        catch (Throwable t) {
            logger.error("Failed to execute dataflow", t);
            this.purge();
            tracker.triggerFailureCallbacks(t);
            this.stateManagerProvider.rollbackUpdates();
            resultQueue.offer(new ExceptionalTriggerResult(t));
        }
    }

    public boolean isStateful() {
        if (this.stateful == null) {
            boolean hasStatefulReportingTask = this.reportingTasks.stream().anyMatch(this::isStateful);
            if (hasStatefulReportingTask) {
                return true;
            }
            this.stateful = this.isStateful(this.rootGroup);
        }
        return this.stateful;
    }

    private boolean isStateful(ProcessGroup processGroup) {
        boolean hasStatefulProcessor = processGroup.getProcessors().stream().anyMatch(this::isStateful);
        if (hasStatefulProcessor) {
            return true;
        }
        boolean hasStatefulControllerService = processGroup.getControllerServices(false).stream().anyMatch(this::isStateful);
        if (hasStatefulControllerService) {
            return true;
        }
        return processGroup.getProcessGroups().stream().anyMatch(this::isStateful);
    }

    private boolean isStateful(ProcessorNode processorNode) {
        Processor processor = processorNode.getProcessor();
        ProcessContext context = this.processContextFactory.createProcessContext((Connectable)processorNode);
        return processor.isStateful(context);
    }

    private boolean isStateful(ControllerServiceNode controllerServiceNode) {
        ControllerService controllerService = controllerServiceNode.getControllerServiceImplementation();
        StandardConfigurationContext context = new StandardConfigurationContext((ComponentNode)controllerServiceNode, (ControllerServiceLookup)this.controllerServiceProvider, null, (VariableRegistry)this.rootGroup.getVariableRegistry());
        return controllerService.isStateful((ConfigurationContext)context);
    }

    private boolean isStateful(ReportingTaskNode reportingTaskNode) {
        ReportingTask reportingTask = reportingTaskNode.getReportingTask();
        return reportingTask.isStateful(reportingTaskNode.getReportingContext());
    }

    public Set<String> getInputPortNames() {
        return this.rootGroup.getInputPorts().stream().map(Connectable::getName).collect(Collectors.toSet());
    }

    public Set<String> getOutputPortNames() {
        return this.rootGroup.getOutputPorts().stream().map(Connectable::getName).collect(Collectors.toSet());
    }

    public QueueSize enqueue(byte[] flowFileContents, Map<String, String> attributes, String portName) {
        QueueSize queueSize;
        ByteArrayInputStream bais = new ByteArrayInputStream(flowFileContents);
        try {
            queueSize = this.enqueue(bais, attributes, portName);
        }
        catch (Throwable throwable) {
            try {
                try {
                    ((InputStream)bais).close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new FlowFileAccessException("Failed to enqueue FlowFile", (Throwable)e);
            }
        }
        ((InputStream)bais).close();
        return queueSize;
    }

    public QueueSize enqueue(InputStream flowFileContents, Map<String, String> attributes, String portName) {
        Port inputPort = this.rootGroup.getInputPortByName(portName);
        if (inputPort == null) {
            throw new IllegalArgumentException("No Input Port exists with name <" + portName + ">. Valid Port names are " + this.getInputPortNames());
        }
        RepositoryContext repositoryContext = this.repositoryContextFactory.createRepositoryContext((Connectable)inputPort);
        StandardProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(repositoryContext, () -> false, (PerformanceTracker)new NopPerformanceTracker());
        ProcessSession session = sessionFactory.createSession();
        try {
            Set portConnections = inputPort.getConnections();
            if (portConnections.isEmpty()) {
                throw new IllegalStateException("Cannot enqueue data for Input Port <" + portName + "> because it has no outgoing connections");
            }
            FlowFile flowFile = session.create();
            flowFile = session.write(flowFile, out -> StreamUtils.copy((InputStream)flowFileContents, (OutputStream)out));
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, LocalPort.PORT_RELATIONSHIP);
            session.commitAsync();
            Connection firstConnection = (Connection)portConnections.iterator().next();
            return firstConnection.getFlowFileQueue().size();
        }
        catch (Throwable t) {
            session.rollback();
            throw t;
        }
    }

    public boolean isFlowFileQueued() {
        for (Connection connection : this.allConnections) {
            if (connection.getFlowFileQueue().isActiveQueueEmpty()) continue;
            return true;
        }
        return false;
    }

    public void purge() {
        ArrayList<FlowFileRecord> flowFiles = new ArrayList<FlowFileRecord>();
        for (Connection connection : this.allConnections) {
            ((DrainableFlowFileQueue)connection.getFlowFileQueue()).drainTo(flowFiles);
            flowFiles.clear();
        }
        this.repositoryContextFactory.getContentRepository().purge();
    }

    public Map<String, String> getComponentStates(Scope scope) {
        Map<String, StateMap> stateMaps = this.stateManagerProvider.getAllComponentStates(scope);
        Map<String, String> componentStates = this.serializeStateMaps(stateMaps);
        return componentStates;
    }

    private Map<String, String> serializeStateMaps(Map<String, StateMap> stateMaps) {
        if (stateMaps == null) {
            return Collections.emptyMap();
        }
        HashMap<String, String> serializedStateMaps = new HashMap<String, String>();
        for (Map.Entry<String, StateMap> entry : stateMaps.entrySet()) {
            String serialized;
            String componentId = entry.getKey();
            StateMap stateMap = entry.getValue();
            if (stateMap.getVersion() == -1L) continue;
            SerializableStateMap serializableStateMap = new SerializableStateMap();
            serializableStateMap.setStateValues(stateMap.toMap());
            serializableStateMap.setVersion(stateMap.getVersion());
            try {
                serialized = this.objectMapper.writeValueAsString((Object)serializableStateMap);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to serialize components' state maps as Strings", e);
            }
            serializedStateMaps.put(componentId, serialized);
        }
        return serializedStateMaps;
    }

    public void setComponentStates(Map<String, String> componentStates, Scope scope) {
        Map<String, StateMap> stateMaps = this.deserializeStateMaps(componentStates);
        this.stateManagerProvider.updateComponentsStates(stateMaps, scope);
    }

    private Map<String, StateMap> deserializeStateMaps(Map<String, String> componentStates) {
        if (componentStates == null) {
            return Collections.emptyMap();
        }
        HashMap<String, StateMap> deserializedStateMaps = new HashMap<String, StateMap>();
        for (Map.Entry<String, String> entry : componentStates.entrySet()) {
            SerializableStateMap deserialized;
            String componentId = entry.getKey();
            String serialized = entry.getValue();
            try {
                deserialized = (SerializableStateMap)this.objectMapper.readValue(serialized, SerializableStateMap.class);
            }
            catch (Exception e) {
                logger.error("Failed to deserialized components' state for component with ID {}. State will be reset to empty", (Object)componentId, (Object)e);
                continue;
            }
            StandardStateMap stateMap = new StandardStateMap(deserialized.getStateValues(), deserialized.getVersion());
            deserializedStateMaps.put(componentId, (StateMap)stateMap);
        }
        return deserializedStateMaps;
    }

    public boolean isSourcePrimaryNodeOnly() {
        for (Connectable connectable : this.rootConnectables) {
            if (!connectable.isIsolated()) continue;
            return true;
        }
        return false;
    }

    public long getSourceYieldExpiration() {
        long latest = 0L;
        for (Connectable connectable : this.rootConnectables) {
            latest = Math.max(latest, connectable.getYieldExpiration());
        }
        return latest;
    }

    public void resetCounters() {
        CounterRepository counterRepo = this.repositoryContextFactory.getCounterRepository();
        counterRepo.getCounters().forEach(counter -> counterRepo.resetCounter(counter.getIdentifier()));
    }

    public Map<String, Long> getCounters(boolean includeGlobalContext) {
        HashMap<String, Long> counters = new HashMap<String, Long>();
        for (Counter counter : this.repositoryContextFactory.getCounterRepository().getCounters()) {
            boolean isGlobalContext;
            boolean bl = isGlobalContext = !counter.getContext().endsWith(")");
            if (!includeGlobalContext && isGlobalContext) continue;
            String counterName = isGlobalContext ? counter.getName() : counter.getName() + " - " + counter.getContext();
            counters.put(counterName, counter.getValue());
        }
        return counters;
    }

    public BulletinRepository getBulletinRepository() {
        return this.bulletinRepository;
    }

    public Set<Processor> findAllProcessors() {
        return this.rootGroup.findAllProcessors().stream().map(ProcessorNode::getProcessor).collect(Collectors.toSet());
    }

    public ContentRepository getContentRepository() {
        return this.repositoryContextFactory.getContentRepository();
    }

    private static class BackgroundTask {
        private final Runnable task;
        private final long schedulingPeriod;
        private final TimeUnit schedulingUnit;

        public BackgroundTask(Runnable task, long schedulingPeriod, TimeUnit schedulingUnit) {
            this.task = task;
            this.schedulingPeriod = schedulingPeriod;
            this.schedulingUnit = schedulingUnit;
        }

        public Runnable getTask() {
            return this.task;
        }

        public long getSchedulingPeriod() {
            return this.schedulingPeriod;
        }

        public TimeUnit getSchedulingUnit() {
            return this.schedulingUnit;
        }
    }

    private static class SerializableStateMap {
        private long version;
        private Map<String, String> stateValues;

        private SerializableStateMap() {
        }

        public long getVersion() {
            return this.version;
        }

        public void setVersion(long version) {
            this.version = version;
        }

        public Map<String, String> getStateValues() {
            return this.stateValues;
        }

        public void setStateValues(Map<String, String> stateValues) {
            this.stateValues = stateValues;
        }
    }
}

