package org.apache.nifi.controller.scheduling;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.EventDrivenWorkerQueue;
import org.apache.nifi.controller.NodeTypeProvider;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.lifecycle.TaskTerminationAwareStateManager;
import org.apache.nifi.controller.repository.ActiveProcessSessionFactory;
import org.apache.nifi.controller.repository.BatchingSessionFactory;
import org.apache.nifi.controller.repository.RepositoryContext;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.repository.WeakHashMapProcessSessionFactory;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.controller.repository.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.class */
public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
    private static final Logger logger = LoggerFactory.getLogger(EventDrivenSchedulingAgent.class);
    private final ControllerServiceProvider serviceProvider;
    private final StateManagerProvider stateManagerProvider;
    private final EventDrivenWorkerQueue workerQueue;
    private final RepositoryContextFactory contextFactory;
    private final AtomicInteger maxThreadCount;
    private final AtomicInteger activeThreadCount;
    private final PropertyEncryptor encryptor;
    private final ExtensionManager extensionManager;
    private final NodeTypeProvider nodeTypeProvider;
    private volatile String adminYieldDuration;
    private final ConcurrentMap<Connectable, AtomicLong> connectionIndexMap;
    private final ConcurrentMap<Connectable, LifecycleState> scheduleStates;

    /* loaded from: input_file:org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent$EventDrivenTask.class */
    private class EventDrivenTask implements Runnable {
        private final EventDrivenWorkerQueue workerQueue;
        private final AtomicInteger activeThreadCount;

        public EventDrivenTask(EventDrivenWorkerQueue eventDrivenWorkerQueue, AtomicInteger atomicInteger) {
            this.workerQueue = eventDrivenWorkerQueue;
            this.activeThreadCount = atomicInteger;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            Connectable connectable;
            LifecycleState lifecycleState;
            StandardProcessSession standardProcessSession;
            BatchingSessionFactory standardProcessSessionFactory;
            boolean z;
            while (!EventDrivenSchedulingAgent.this.flowEngine.isShutdown()) {
                EventDrivenWorkerQueue.Worker m4poll = this.workerQueue.m4poll(1L, TimeUnit.SECONDS);
                if (m4poll != null && (lifecycleState = EventDrivenSchedulingAgent.this.scheduleStates.get((connectable = m4poll.getConnectable()))) != null) {
                    this.activeThreadCount.incrementAndGet();
                    try {
                        AtomicLong atomicLong = EventDrivenSchedulingAgent.this.connectionIndexMap.get(connectable);
                        if (atomicLong == null) {
                            atomicLong = new AtomicLong(0L);
                            AtomicLong putIfAbsent = EventDrivenSchedulingAgent.this.connectionIndexMap.putIfAbsent(connectable, atomicLong);
                            if (putIfAbsent != null) {
                                atomicLong = putIfAbsent;
                            }
                        }
                        RepositoryContext newProcessContext = EventDrivenSchedulingAgent.this.contextFactory.newProcessContext(connectable, atomicLong);
                        if (connectable instanceof ProcessorNode) {
                            Connectable connectable2 = (ProcessorNode) connectable;
                            StateManager stateManager = EventDrivenSchedulingAgent.this.getStateManager(connectable.getIdentifier());
                            Objects.requireNonNull(lifecycleState);
                            TaskTerminationAwareStateManager taskTerminationAwareStateManager = new TaskTerminationAwareStateManager(stateManager, lifecycleState::isTerminated);
                            ControllerServiceProvider controllerServiceProvider = EventDrivenSchedulingAgent.this.serviceProvider;
                            PropertyEncryptor propertyEncryptor = EventDrivenSchedulingAgent.this.encryptor;
                            Objects.requireNonNull(lifecycleState);
                            StandardProcessContext standardProcessContext = new StandardProcessContext(connectable2, controllerServiceProvider, propertyEncryptor, taskTerminationAwareStateManager, lifecycleState::isTerminated, EventDrivenSchedulingAgent.this.nodeTypeProvider);
                            long runDuration = connectable2.getRunDuration(TimeUnit.NANOSECONDS);
                            if (!connectable2.isSessionBatchingSupported() || runDuration <= 0) {
                                standardProcessSession = null;
                                Objects.requireNonNull(lifecycleState);
                                standardProcessSessionFactory = new StandardProcessSessionFactory(newProcessContext, lifecycleState::isTerminated);
                                z = false;
                            } else {
                                Objects.requireNonNull(lifecycleState);
                                standardProcessSession = new StandardProcessSession(newProcessContext, lifecycleState::isTerminated);
                                standardProcessSessionFactory = new BatchingSessionFactory(standardProcessSession);
                                z = true;
                            }
                            WeakHashMapProcessSessionFactory weakHashMapProcessSessionFactory = new WeakHashMapProcessSessionFactory(standardProcessSessionFactory);
                            long nanoTime = System.nanoTime();
                            long j = nanoTime + runDuration;
                            int i = 0;
                            boolean z2 = true;
                            while (z2) {
                                try {
                                    trigger(connectable2, newProcessContext, lifecycleState, standardProcessContext, weakHashMapProcessSessionFactory);
                                    i++;
                                    if (z && System.nanoTime() <= j && lifecycleState.isScheduled()) {
                                        int decrementEventCount = m4poll.decrementEventCount();
                                        if (decrementEventCount < 0) {
                                            m4poll.incrementEventCount();
                                        }
                                        z2 = decrementEventCount > 0;
                                    }
                                } catch (Throwable th) {
                                    if (z && standardProcessSession != null) {
                                        try {
                                            standardProcessSession.commitAsync();
                                        } catch (RuntimeException e) {
                                            EventDrivenSchedulingAgent.logger.error("Unable to commit process session", e);
                                        }
                                    }
                                    try {
                                        long nanoTime2 = System.nanoTime() - nanoTime;
                                        StandardFlowFileEvent standardFlowFileEvent = new StandardFlowFileEvent();
                                        standardFlowFileEvent.setProcessingNanos(nanoTime2);
                                        standardFlowFileEvent.setInvocations(i);
                                        newProcessContext.getFlowFileEventRepository().updateRepository(standardFlowFileEvent, connectable.getIdentifier());
                                    } catch (IOException e2) {
                                        EventDrivenSchedulingAgent.logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e2.toString());
                                        EventDrivenSchedulingAgent.logger.error("", e2);
                                    }
                                    throw th;
                                }
                            }
                            if (z && standardProcessSession != null) {
                                try {
                                    standardProcessSession.commitAsync();
                                } catch (RuntimeException e3) {
                                    EventDrivenSchedulingAgent.logger.error("Unable to commit process session", e3);
                                }
                            }
                            try {
                                long nanoTime3 = System.nanoTime() - nanoTime;
                                StandardFlowFileEvent standardFlowFileEvent2 = new StandardFlowFileEvent();
                                standardFlowFileEvent2.setProcessingNanos(nanoTime3);
                                standardFlowFileEvent2.setInvocations(i);
                                newProcessContext.getFlowFileEventRepository().updateRepository(standardFlowFileEvent2, connectable.getIdentifier());
                            } catch (IOException e4) {
                                EventDrivenSchedulingAgent.logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e4.toString());
                                EventDrivenSchedulingAgent.logger.error("", e4);
                            }
                            if (Connectables.flowFilesQueued(connectable2)) {
                                EventDrivenSchedulingAgent.this.onEvent(connectable2);
                            }
                        } else {
                            Objects.requireNonNull(lifecycleState);
                            trigger(connectable, lifecycleState, new ConnectableProcessContext(connectable, EventDrivenSchedulingAgent.this.encryptor, EventDrivenSchedulingAgent.this.getStateManager(connectable.getIdentifier())), new WeakHashMapProcessSessionFactory(new StandardProcessSessionFactory(newProcessContext, lifecycleState::isTerminated)));
                            if (Connectables.flowFilesQueued(connectable)) {
                                EventDrivenSchedulingAgent.this.onEvent(connectable);
                            }
                        }
                    } finally {
                        this.activeThreadCount.decrementAndGet();
                    }
                }
            }
        }

        private void trigger(Connectable connectable, LifecycleState lifecycleState, ConnectableProcessContext connectableProcessContext, ActiveProcessSessionFactory activeProcessSessionFactory) {
            NarCloseable withComponentNarLoader;
            if (lifecycleState.incrementActiveThreadCount(activeProcessSessionFactory) > connectable.getMaxConcurrentTasks() && connectable.getMaxConcurrentTasks() > 0) {
                lifecycleState.decrementActiveThreadCount();
                return;
            }
            try {
                try {
                    try {
                        NarCloseable withComponentNarLoader2 = NarCloseable.withComponentNarLoader(EventDrivenSchedulingAgent.this.extensionManager, connectable.getClass(), connectable.getIdentifier());
                        try {
                            connectable.onTrigger(connectableProcessContext, activeProcessSessionFactory);
                            if (withComponentNarLoader2 != null) {
                                withComponentNarLoader2.close();
                            }
                        } catch (Throwable th) {
                            if (withComponentNarLoader2 != null) {
                                try {
                                    withComponentNarLoader2.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        if (!lifecycleState.isScheduled() && lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) {
                            withComponentNarLoader = NarCloseable.withComponentNarLoader(EventDrivenSchedulingAgent.this.extensionManager, connectable.getClass(), connectable.getIdentifier());
                            try {
                                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, new Object[]{connectableProcessContext});
                                if (withComponentNarLoader != null) {
                                    withComponentNarLoader.close();
                                }
                            } finally {
                            }
                        }
                        lifecycleState.decrementActiveThreadCount();
                        throw th3;
                    }
                } catch (Throwable th4) {
                    EventDrivenSchedulingAgent.logger.error("{} failed to process session due to {}", connectable, th4.toString());
                    EventDrivenSchedulingAgent.logger.error("", th4);
                    EventDrivenSchedulingAgent.logger.warn("{} Administratively Pausing for {} due to processing failure: {}", new Object[]{connectable, EventDrivenSchedulingAgent.this.getAdministrativeYieldDuration(), th4.toString()});
                    EventDrivenSchedulingAgent.logger.warn("", th4);
                    try {
                        Thread.sleep(FormatUtils.getTimeDuration(EventDrivenSchedulingAgent.this.adminYieldDuration, TimeUnit.MILLISECONDS));
                    } catch (InterruptedException e) {
                    }
                }
            } catch (ProcessException e2) {
                EventDrivenSchedulingAgent.logger.error("{} failed to process session due to {}", connectable, e2.toString());
            }
            if (!lifecycleState.isScheduled() && lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) {
                withComponentNarLoader = NarCloseable.withComponentNarLoader(EventDrivenSchedulingAgent.this.extensionManager, connectable.getClass(), connectable.getIdentifier());
                try {
                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, new Object[]{connectableProcessContext});
                    if (withComponentNarLoader != null) {
                        withComponentNarLoader.close();
                    }
                } finally {
                }
            }
            lifecycleState.decrementActiveThreadCount();
        }

        private void trigger(ProcessorNode processorNode, RepositoryContext repositoryContext, LifecycleState lifecycleState, StandardProcessContext standardProcessContext, ActiveProcessSessionFactory activeProcessSessionFactory) {
            NarCloseable withComponentNarLoader;
            if (lifecycleState.incrementActiveThreadCount(activeProcessSessionFactory) > processorNode.getMaxConcurrentTasks() && processorNode.getMaxConcurrentTasks() > 0) {
                lifecycleState.decrementActiveThreadCount();
                return;
            }
            try {
                try {
                    try {
                        NarCloseable withComponentNarLoader2 = NarCloseable.withComponentNarLoader(EventDrivenSchedulingAgent.this.extensionManager, processorNode.getProcessor().getClass(), processorNode.getIdentifier());
                        try {
                            processorNode.onTrigger(standardProcessContext, activeProcessSessionFactory);
                            if (withComponentNarLoader2 != null) {
                                withComponentNarLoader2.close();
                            }
                        } catch (Throwable th) {
                            if (withComponentNarLoader2 != null) {
                                try {
                                    withComponentNarLoader2.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            }
                            throw th;
                        }
                    } catch (Throwable th3) {
                        if (!lifecycleState.isScheduled() && lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) {
                            withComponentNarLoader = NarCloseable.withComponentNarLoader(EventDrivenSchedulingAgent.this.extensionManager, processorNode.getProcessor().getClass(), processorNode.getIdentifier());
                            try {
                                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processorNode.getProcessor(), new Object[]{standardProcessContext});
                                if (withComponentNarLoader != null) {
                                    withComponentNarLoader.close();
                                }
                            } finally {
                            }
                        }
                        lifecycleState.decrementActiveThreadCount();
                        throw th3;
                    }
                } catch (ProcessException e) {
                    new SimpleProcessLogger(processorNode.getIdentifier(), processorNode.getProcessor()).error("Failed to process session due to {}", new Object[]{e});
                }
            } catch (Throwable th4) {
                SimpleProcessLogger simpleProcessLogger = new SimpleProcessLogger(processorNode.getIdentifier(), processorNode.getProcessor());
                simpleProcessLogger.error("{} failed to process session due to {}", new Object[]{processorNode.getProcessor(), th4});
                simpleProcessLogger.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{EventDrivenSchedulingAgent.this.adminYieldDuration});
                EventDrivenSchedulingAgent.logger.warn("Administratively Yielding {} due to uncaught Exception: ", processorNode.getProcessor());
                EventDrivenSchedulingAgent.logger.warn("", th4);
                processorNode.yield(FormatUtils.getTimeDuration(EventDrivenSchedulingAgent.this.adminYieldDuration, TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            }
            if (!lifecycleState.isScheduled() && lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) {
                withComponentNarLoader = NarCloseable.withComponentNarLoader(EventDrivenSchedulingAgent.this.extensionManager, processorNode.getProcessor().getClass(), processorNode.getIdentifier());
                try {
                    ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, processorNode.getProcessor(), new Object[]{standardProcessContext});
                    if (withComponentNarLoader != null) {
                        withComponentNarLoader.close();
                    }
                } finally {
                }
            }
            lifecycleState.decrementActiveThreadCount();
        }
    }

    public EventDrivenSchedulingAgent(FlowEngine flowEngine, ControllerServiceProvider controllerServiceProvider, StateManagerProvider stateManagerProvider, EventDrivenWorkerQueue eventDrivenWorkerQueue, RepositoryContextFactory repositoryContextFactory, int i, PropertyEncryptor propertyEncryptor, ExtensionManager extensionManager, NodeTypeProvider nodeTypeProvider) {
        super(flowEngine);
        this.activeThreadCount = new AtomicInteger(0);
        this.adminYieldDuration = "1 sec";
        this.connectionIndexMap = new ConcurrentHashMap();
        this.scheduleStates = new ConcurrentHashMap();
        this.serviceProvider = controllerServiceProvider;
        this.stateManagerProvider = stateManagerProvider;
        this.workerQueue = eventDrivenWorkerQueue;
        this.contextFactory = repositoryContextFactory;
        this.maxThreadCount = new AtomicInteger(i);
        this.encryptor = propertyEncryptor;
        this.extensionManager = extensionManager;
        this.nodeTypeProvider = nodeTypeProvider;
        for (int i2 = 0; i2 < i; i2++) {
            flowEngine.scheduleWithFixedDelay(new EventDrivenTask(eventDrivenWorkerQueue, this.activeThreadCount), 0L, 1L, TimeUnit.NANOSECONDS);
        }
    }

    public int getActiveThreadCount() {
        return this.activeThreadCount.get();
    }

    private StateManager getStateManager(String str) {
        return this.stateManagerProvider.getStateManager(str);
    }

    public void shutdown() {
        this.flowEngine.shutdown();
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public void doSchedule(ReportingTaskNode reportingTaskNode, LifecycleState lifecycleState) {
        throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public void doUnschedule(ReportingTaskNode reportingTaskNode, LifecycleState lifecycleState) {
        throw new UnsupportedOperationException("ReportingTasks cannot be scheduled in Event-Driven Mode");
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public void doSchedule(Connectable connectable, LifecycleState lifecycleState) {
        this.workerQueue.resumeWork(connectable);
        logger.info("Scheduled {} to run in Event-Driven mode", connectable);
        this.scheduleStates.put(connectable, lifecycleState);
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    protected void doScheduleOnce(Connectable connectable, LifecycleState lifecycleState, Callable<Future<Void>> callable) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public void doUnschedule(Connectable connectable, LifecycleState lifecycleState) {
        this.workerQueue.suspendWork(connectable);
        logger.info("Stopped scheduling {} to run", connectable);
    }

    public void onEvent(Connectable connectable) {
        this.workerQueue.offer(connectable);
    }

    public void setMaxThreadCount(int i) {
        int andSet = this.maxThreadCount.getAndSet(i);
        if (i > andSet) {
            int i2 = i - andSet;
            for (int i3 = 0; i3 < i2; i3++) {
                this.flowEngine.scheduleWithFixedDelay(new EventDrivenTask(this.workerQueue, this.activeThreadCount), 0L, 1L, TimeUnit.NANOSECONDS);
            }
        }
    }

    public void incrementMaxThreadCount(int i) {
    }

    public void setAdministrativeYieldDuration(String str) {
        this.adminYieldDuration = str;
    }

    public String getAdministrativeYieldDuration() {
        return this.adminYieldDuration;
    }

    public long getAdministrativeYieldDuration(TimeUnit timeUnit) {
        return FormatUtils.getTimeDuration(this.adminYieldDuration, timeUnit);
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public /* bridge */ /* synthetic */ void unschedule(ReportingTaskNode reportingTaskNode, LifecycleState lifecycleState) {
        super.unschedule(reportingTaskNode, lifecycleState);
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public /* bridge */ /* synthetic */ void schedule(ReportingTaskNode reportingTaskNode, LifecycleState lifecycleState) {
        super.schedule(reportingTaskNode, lifecycleState);
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public /* bridge */ /* synthetic */ void unschedule(Connectable connectable, LifecycleState lifecycleState) {
        super.unschedule(connectable, lifecycleState);
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public /* bridge */ /* synthetic */ void scheduleOnce(Connectable connectable, LifecycleState lifecycleState, Callable callable) {
        super.scheduleOnce(connectable, lifecycleState, callable);
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public /* bridge */ /* synthetic */ void schedule(Connectable connectable, LifecycleState lifecycleState) {
        super.schedule(connectable, lifecycleState);
    }
}
