package org.apache.nifi.controller;

import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.AuthorizerCapabilityDetection;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ConnectionException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.BundleUpdateStrategy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.lifecycle.LifeCycleStartException;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.persistence.FlowConfigurationDAO;
import org.apache.nifi.persistence.StandardFlowConfigurationDAO;
import org.apache.nifi.persistence.TemplateDeserializer;
import org.apache.nifi.reporting.UserAwareEventAccess;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.revision.RevisionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/StandardFlowService.class */
public class StandardFlowService implements FlowService, ProtocolHandler {
    private static final String EVENT_CATEGORY = "Controller";
    private static final String CLUSTER_NODE_CONFIG = "Cluster Node Configuration";
    private static final String NODE_UUID = "Node UUID";
    private final FlowController controller;
    private final FlowConfigurationDAO dao;
    private final int gracefulShutdownSeconds;
    private final boolean autoResumeState;
    private final Authorizer authorizer;
    private final ClusterCoordinator clusterCoordinator;
    private final RevisionManager revisionManager;
    private volatile SaveReportingTask saveReportingTask;
    private final NodeProtocolSenderListener senderListener;
    private final boolean configuredForClustering;
    private NodeIdentifier nodeId;
    private final NiFiProperties nifiProperties;
    private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to connect node to cluster";
    private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class);
    private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = this.rwLock.readLock();
    private final Lock writeLock = this.rwLock.writeLock();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<>(null);
    private final AtomicReference<SaveHolder> saveHolder = new AtomicReference<>(null);
    private boolean firstControllerInitialization = true;

    /* renamed from: org.apache.nifi.controller.StandardFlowService$4, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/controller/StandardFlowService$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType = new int[ProtocolMessage.MessageType.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.RECONNECTION_REQUEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.OFFLOAD_REQUEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.DISCONNECTION_REQUEST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[ProtocolMessage.MessageType.FLOW_REQUEST.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/controller/StandardFlowService$SaveHolder.class */
    public class SaveHolder {
        private final Calendar saveTime;
        private final boolean shouldArchive;

        private SaveHolder(Calendar calendar, boolean z) {
            this.saveTime = calendar;
            this.shouldArchive = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/controller/StandardFlowService$SaveReportingTask.class */
    public class SaveReportingTask implements Runnable {
        private SaveReportingTask() {
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            ClassLoader classLoader = null;
            Bundle frameworkBundle = NarClassLoadersHolder.getInstance().getFrameworkBundle();
            if (frameworkBundle != null) {
                classLoader = Thread.currentThread().getContextClassLoader();
                Thread.currentThread().setContextClassLoader(frameworkBundle.getClassLoader());
            }
            try {
                try {
                    SaveHolder saveHolder = StandardFlowService.this.saveHolder.get();
                    if (saveHolder == null) {
                        if (classLoader != null) {
                            Thread.currentThread().setContextClassLoader(classLoader);
                            return;
                        }
                        return;
                    }
                    if (StandardFlowService.logger.isTraceEnabled()) {
                        StandardFlowService.logger.trace("Save request time {} // Current time {}", saveHolder.saveTime.getTime(), new Date());
                    }
                    if (saveHolder.saveTime.before(Calendar.getInstance())) {
                        if (StandardFlowService.logger.isTraceEnabled()) {
                            StandardFlowService.logger.trace("Waiting for write lock and then will save");
                        }
                        StandardFlowService.this.writeLock.lock();
                        try {
                            StandardFlowService.this.dao.save(StandardFlowService.this.controller, saveHolder.shouldArchive);
                            StandardFlowService.logger.info("Saved flow controller {} // Another save pending = {}", StandardFlowService.this.controller, Boolean.valueOf(!StandardFlowService.this.saveHolder.compareAndSet(saveHolder, null)));
                            StandardFlowService.this.writeLock.unlock();
                        } catch (Throwable th) {
                            StandardFlowService.this.writeLock.unlock();
                            throw th;
                        }
                    }
                    if (classLoader != null) {
                        Thread.currentThread().setContextClassLoader(classLoader);
                    }
                } catch (Throwable th2) {
                    StandardFlowService.logger.error("Unable to save flow controller configuration due to: " + th2, th2);
                    if (StandardFlowService.logger.isDebugEnabled()) {
                        StandardFlowService.logger.error("", th2);
                    }
                    StandardFlowService.this.controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(StandardFlowService.EVENT_CATEGORY, LogLevel.ERROR.name(), "Unable to save flow controller configuration."));
                    if (classLoader != null) {
                        Thread.currentThread().setContextClassLoader(classLoader);
                    }
                }
            } catch (Throwable th3) {
                if (classLoader != null) {
                    Thread.currentThread().setContextClassLoader(classLoader);
                }
                throw th3;
            }
        }
    }

    public static StandardFlowService createStandaloneInstance(FlowController flowController, NiFiProperties niFiProperties, RevisionManager revisionManager, Authorizer authorizer, FlowSerializationStrategy flowSerializationStrategy) throws IOException {
        return new StandardFlowService(flowController, niFiProperties, null, false, null, revisionManager, authorizer, flowSerializationStrategy);
    }

    public static StandardFlowService createClusteredInstance(FlowController flowController, NiFiProperties niFiProperties, NodeProtocolSenderListener nodeProtocolSenderListener, ClusterCoordinator clusterCoordinator, RevisionManager revisionManager, Authorizer authorizer) throws IOException {
        return new StandardFlowService(flowController, niFiProperties, nodeProtocolSenderListener, true, clusterCoordinator, revisionManager, authorizer, FlowSerializationStrategy.WRITE_XML_AND_JSON);
    }

    private StandardFlowService(FlowController flowController, NiFiProperties niFiProperties, NodeProtocolSenderListener nodeProtocolSenderListener, boolean z, ClusterCoordinator clusterCoordinator, RevisionManager revisionManager, Authorizer authorizer, FlowSerializationStrategy flowSerializationStrategy) throws IOException {
        this.nifiProperties = niFiProperties;
        this.controller = flowController;
        this.gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(niFiProperties.getProperty("nifi.flowcontroller.graceful.shutdown.period"), TimeUnit.SECONDS);
        this.autoResumeState = niFiProperties.getAutoResumeState();
        this.dao = new StandardFlowConfigurationDAO(niFiProperties, flowController.getExtensionManager(), flowSerializationStrategy);
        this.clusterCoordinator = clusterCoordinator;
        if (clusterCoordinator != null) {
            clusterCoordinator.setFlowService(this);
        }
        this.revisionManager = revisionManager;
        this.authorizer = authorizer;
        if (!z) {
            this.configuredForClustering = false;
            this.senderListener = null;
            return;
        }
        this.configuredForClustering = z;
        this.senderListener = nodeProtocolSenderListener;
        nodeProtocolSenderListener.addHandler(this);
        InetSocketAddress nodeApiAddress = niFiProperties.getNodeApiAddress();
        InetSocketAddress clusterNodeProtocolAddress = niFiProperties.getClusterNodeProtocolAddress();
        InetSocketAddress clusterLoadBalanceAddress = niFiProperties.getClusterLoadBalanceAddress();
        String str = null;
        StateManager stateManager = flowController.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG);
        str = stateManager != null ? stateManager.getState(Scope.LOCAL).get(NODE_UUID) : str;
        this.nodeId = new NodeIdentifier(str == null ? UUID.randomUUID().toString() : str, nodeApiAddress.getHostName(), nodeApiAddress.getPort(), clusterNodeProtocolAddress.getHostName(), clusterNodeProtocolAddress.getPort(), clusterLoadBalanceAddress.getHostName(), clusterLoadBalanceAddress.getPort(), niFiProperties.getRemoteInputHost(), niFiProperties.getRemoteInputPort(), niFiProperties.getRemoteInputHttpPort(), niFiProperties.isSiteToSiteSecure().booleanValue());
    }

    public void saveFlowChanges() throws IOException {
        this.writeLock.lock();
        try {
            this.dao.save(this.controller);
        } finally {
            this.writeLock.unlock();
        }
    }

    public void saveFlowChanges(TimeUnit timeUnit, long j) {
        saveFlowChanges(timeUnit, j, this.nifiProperties.isFlowConfigurationArchiveEnabled());
    }

    public void saveFlowChanges(TimeUnit timeUnit, long j, boolean z) {
        Calendar calendar = Calendar.getInstance();
        long convert = TimeUnit.MILLISECONDS.convert(j, timeUnit);
        int i = 500;
        if (convert <= 2147483647L) {
            i = (int) convert;
        }
        calendar.add(14, i);
        if (logger.isTraceEnabled()) {
            logger.trace(" A request to save the flow has been made with delay {} for time {}", Integer.valueOf(i), calendar.getTime());
        }
        this.saveHolder.set(new SaveHolder(calendar, z));
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void start() throws LifeCycleStartException {
        this.writeLock.lock();
        try {
            try {
                if (isRunning()) {
                    return;
                }
                this.running.set(true);
                ScheduledExecutorService flowEngine = new FlowEngine(2, "Flow Service Tasks");
                this.saveReportingTask = new SaveReportingTask();
                flowEngine.scheduleWithFixedDelay(this.saveReportingTask, 0L, 500L, TimeUnit.MILLISECONDS);
                this.executor.set(flowEngine);
                if (this.configuredForClustering) {
                    this.senderListener.start();
                }
                this.writeLock.unlock();
            } finally {
                this.writeLock.unlock();
            }
        } catch (IOException e) {
            try {
                stop(true);
            } catch (Exception e2) {
            }
            throw new LifeCycleStartException("Failed to start Flow Service due to: " + e, e);
        }
    }

    public void stop(boolean z) {
        boolean z2;
        this.writeLock.lock();
        try {
            if (isRunning()) {
                this.running.set(false);
                if (this.clusterCoordinator != null) {
                    try {
                        this.clusterCoordinator.shutdown();
                    } catch (Throwable th) {
                        logger.error("Failed to properly shutdown coordinator", th);
                    }
                }
                if (!this.controller.isTerminated()) {
                    this.controller.shutdown(z);
                }
                if (this.configuredForClustering && this.senderListener != null) {
                    try {
                        this.senderListener.stop();
                    } catch (IOException e) {
                        logger.warn("Protocol sender/listener did not stop gracefully due to: " + e);
                    }
                }
                ScheduledExecutorService scheduledExecutorService = this.executor.get();
                if (scheduledExecutorService != null) {
                    if (z) {
                        scheduledExecutorService.shutdownNow();
                    } else {
                        scheduledExecutorService.shutdown();
                    }
                    try {
                        z2 = scheduledExecutorService.awaitTermination(this.gracefulShutdownSeconds, TimeUnit.SECONDS);
                    } catch (InterruptedException e2) {
                        z2 = false;
                    }
                    if (!z2) {
                        logger.warn("Scheduling service did not gracefully shutdown within configured " + this.gracefulShutdownSeconds + " second window");
                    }
                }
                this.saveReportingTask.run();
            }
        } finally {
            this.writeLock.unlock();
        }
    }

    public boolean canHandle(ProtocolMessage protocolMessage) {
        switch (AnonymousClass4.$SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[protocolMessage.getType().ordinal()]) {
            case 1:
            case 2:
            case 3:
            case 4:
                return true;
            default:
                return false;
        }
    }

    /* JADX WARN: Finally extract failed */
    public ProtocolMessage handle(final ProtocolMessage protocolMessage, Set<String> set) throws ProtocolException {
        long nanoTime = System.nanoTime();
        try {
            switch (AnonymousClass4.$SwitchMap$org$apache$nifi$cluster$protocol$message$ProtocolMessage$MessageType[protocolMessage.getType().ordinal()]) {
                case 1:
                    this.controller.suspendHeartbeats();
                    Thread thread = new Thread(new Runnable() { // from class: org.apache.nifi.controller.StandardFlowService.1
                        @Override // java.lang.Runnable
                        public void run() {
                            StandardFlowService.this.handleReconnectionRequest((ReconnectionRequestMessage) protocolMessage);
                        }
                    }, "Reconnect to Cluster");
                    thread.setDaemon(true);
                    thread.start();
                    ReconnectionResponseMessage reconnectionResponseMessage = new ReconnectionResponseMessage();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Finished Processing Protocol Message of type {} in {} millis", protocolMessage.getType(), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
                    }
                    return reconnectionResponseMessage;
                case 2:
                    Thread thread2 = new Thread(new Runnable() { // from class: org.apache.nifi.controller.StandardFlowService.2
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                StandardFlowService.this.handleOffloadRequest((OffloadMessage) protocolMessage);
                            } catch (InterruptedException e) {
                                throw new ProtocolException("Could not complete offload request", e);
                            }
                        }
                    }, "Offload Flow Files from Node");
                    thread2.setDaemon(true);
                    thread2.start();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Finished Processing Protocol Message of type {} in {} millis", protocolMessage.getType(), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
                    }
                    return null;
                case 3:
                    Thread thread3 = new Thread(new Runnable() { // from class: org.apache.nifi.controller.StandardFlowService.3
                        @Override // java.lang.Runnable
                        public void run() {
                            StandardFlowService.this.handleDisconnectionRequest((DisconnectMessage) protocolMessage);
                        }
                    }, "Disconnect from Cluster");
                    thread3.setDaemon(true);
                    thread3.start();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Finished Processing Protocol Message of type {} in {} millis", protocolMessage.getType(), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
                    }
                    return null;
                case 4:
                    FlowResponseMessage handleFlowRequest = handleFlowRequest((FlowRequestMessage) protocolMessage);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Finished Processing Protocol Message of type {} in {} millis", protocolMessage.getType(), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
                    }
                    return handleFlowRequest;
                default:
                    throw new ProtocolException("Handler cannot handle message type: " + protocolMessage.getType());
            }
        } catch (Throwable th) {
            if (logger.isDebugEnabled()) {
                logger.debug("Finished Processing Protocol Message of type {} in {} millis", protocolMessage.getType(), Long.valueOf(TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS)));
            }
            throw th;
        }
    }

    /* JADX WARN: Can't wrap try/catch for region: R(14:10|11|12|13|(1:15)|16|17|(3:36|37|(5:39|40|22|23|24))|19|20|21|22|23|24) */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x011c, code lost:
    
        r12 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x011e, code lost:
    
        org.apache.nifi.controller.StandardFlowService.logger.warn("Unable to start all processors due to invalid flow configuration.");
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x0130, code lost:
    
        if (org.apache.nifi.controller.StandardFlowService.logger.isDebugEnabled() != false) goto L30;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x0133, code lost:
    
        org.apache.nifi.controller.StandardFlowService.logger.warn("", r12);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void load(org.apache.nifi.cluster.protocol.DataFlow r7) throws java.io.IOException, org.apache.nifi.controller.serialization.FlowSerializationException, org.apache.nifi.controller.serialization.FlowSynchronizationException, org.apache.nifi.controller.UninheritableFlowException, org.apache.nifi.controller.MissingBundleException {
        /*
            Method dump skipped, instructions count: 472
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.nifi.controller.StandardFlowService.load(org.apache.nifi.cluster.protocol.DataFlow):void");
    }

    private void handleConnectionFailure(Exception exc) {
        this.clusterCoordinator.disconnectionRequestedByNode(getNodeId(), exc instanceof UninheritableFlowException ? DisconnectionCode.MISMATCHED_FLOWS : exc instanceof MissingBundleException ? DisconnectionCode.MISSING_BUNDLE : exc instanceof FlowSynchronizationException ? DisconnectionCode.MISMATCHED_FLOWS : DisconnectionCode.STARTUP_FAILURE, exc.toString());
        this.controller.setClustered(false, null);
        this.clusterCoordinator.setConnected(false);
    }

    private FlowResponseMessage handleFlowRequest(FlowRequestMessage flowRequestMessage) throws ProtocolException {
        this.readLock.lock();
        try {
            try {
                logger.info("Received flow request message from cluster coordinator.");
                FlowResponseMessage flowResponseMessage = new FlowResponseMessage();
                flowResponseMessage.setDataFlow(m14createDataFlowFromController());
                this.readLock.unlock();
                return flowResponseMessage;
            } catch (Exception e) {
                throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + e, e);
            }
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    private byte[] getAuthorizerFingerprint() {
        if (AuthorizerCapabilityDetection.isManagedAuthorizer(this.authorizer)) {
            return this.authorizer.getFingerprint().getBytes(StandardCharsets.UTF_8);
        }
        return null;
    }

    /* renamed from: createDataFlow, reason: merged with bridge method [inline-methods] */
    public StandardDataFlow m15createDataFlow() throws IOException {
        if (!this.dao.isFlowPresent()) {
            return m14createDataFlowFromController();
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.dao.load(byteArrayOutputStream);
        return new StandardDataFlow(byteArrayOutputStream.toByteArray(), this.controller.getSnippetManager().export(), getAuthorizerFingerprint(), new HashSet());
    }

    /* renamed from: createDataFlowFromController, reason: merged with bridge method [inline-methods] */
    public StandardDataFlow m14createDataFlowFromController() throws IOException {
        byte[] export = this.controller.getSnippetManager().export();
        byte[] authorizerFingerprint = getAuthorizerFingerprint();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.dao.save(this.controller, byteArrayOutputStream);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        byteArrayOutputStream.reset();
        FlowManager flowManager = this.controller.getFlowManager();
        HashSet hashSet = new HashSet();
        flowManager.getRootGroup().findAllProcessors().stream().filter((v0) -> {
            return v0.isExtensionMissing();
        }).forEach(processorNode -> {
            hashSet.add(processorNode.getIdentifier());
        });
        flowManager.getAllControllerServices().stream().filter((v0) -> {
            return v0.isExtensionMissing();
        }).forEach(controllerServiceNode -> {
            hashSet.add(controllerServiceNode.getIdentifier());
        });
        this.controller.getAllReportingTasks().stream().filter((v0) -> {
            return v0.isExtensionMissing();
        }).forEach(reportingTaskNode -> {
            hashSet.add(reportingTaskNode.getIdentifier());
        });
        this.controller.getFlowManager().getAllParameterProviders().stream().filter((v0) -> {
            return v0.isExtensionMissing();
        }).forEach(parameterProviderNode -> {
            hashSet.add(parameterProviderNode.getIdentifier());
        });
        this.controller.getFlowManager().getAllFlowRegistryClients().stream().filter((v0) -> {
            return v0.isExtensionMissing();
        }).forEach(flowRegistryClientNode -> {
            hashSet.add(flowRegistryClientNode.getIdentifier());
        });
        return new StandardDataFlow(byteArray, export, authorizerFingerprint, hashSet);
    }

    private NodeIdentifier getNodeId() {
        this.readLock.lock();
        try {
            return this.nodeId;
        } finally {
            this.readLock.unlock();
        }
    }

    private void handleReconnectionRequest(ReconnectionRequestMessage reconnectionRequestMessage) {
        try {
            logger.info("Processing reconnection request from cluster coordinator.");
            if (this.controller.isConnected()) {
                this.controller.onClusterDisconnect();
            }
            ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), reconnectionRequestMessage.getDataFlow(), reconnectionRequestMessage.getInstanceId(), reconnectionRequestMessage.getNodeConnectionStatuses(), reconnectionRequestMessage.getComponentRevisions());
            if (connectionResponse.getDataFlow() == null) {
                logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow.");
                connectionResponse = connect(false, false, m14createDataFlowFromController());
            }
            if (connectionResponse == null) {
                logger.warn("Received a Reconnection Request that contained no DataFlow, and was unable to communicate with an active Cluster Coordinator. Cannot connect to cluster at this time.");
                this.controller.resumeHeartbeats();
                return;
            }
            loadFromConnectionResponse(connectionResponse);
            this.clusterCoordinator.resetNodeStatuses((Map) connectionResponse.getNodeConnectionStatuses().stream().collect(Collectors.toMap((v0) -> {
                return v0.getNodeIdentifier();
            }, nodeConnectionStatus -> {
                return nodeConnectionStatus;
            })));
            saveFlowChanges();
            this.controller.onClusterConnect();
            logger.info("Node reconnected.");
        } catch (Exception e) {
            if (this.controller.isClustered()) {
                disconnect("Failed to properly handle Reconnection request due to " + e.toString());
            }
            logger.error("Handling reconnection request failed due to: " + e, e);
            handleConnectionFailure(e);
        }
    }

    private void handleOffloadRequest(OffloadMessage offloadMessage) throws InterruptedException {
        logger.info("Received offload request message from cluster coordinator with explanation: " + offloadMessage.getExplanation());
        offload(offloadMessage.getExplanation());
    }

    private void offload(String str) throws InterruptedException {
        this.writeLock.lock();
        try {
            logger.info("Offloading node due to " + str);
            this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, str));
            FlowManager flowManager = this.controller.getFlowManager();
            flowManager.getRootGroup().stopProcessing();
            flowManager.getRootGroup().findAllProcessors().stream().filter(processorNode -> {
                return processorNode.getScheduledState() == ScheduledState.STOPPED;
            }).forEach(processorNode2 -> {
                processorNode2.getProcessGroup().terminateProcessor(processorNode2);
            });
            flowManager.getRootGroup().findAllRemoteProcessGroups().stream().filter((v0) -> {
                return v0.isTransmitting();
            }).forEach(remoteProcessGroup -> {
                try {
                    remoteProcessGroup.stopTransmitting().get(remoteProcessGroup.getCommunicationsTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    logger.warn("Encountered failure while waiting for {} to shutdown", remoteProcessGroup, e);
                }
            });
            Set findAllConnections = flowManager.findAllConnections();
            Iterator it = findAllConnections.iterator();
            while (it.hasNext()) {
                ((Connection) it.next()).getFlowFileQueue().offloadQueue();
            }
            UserAwareEventAccess eventAccess = this.controller.getEventAccess();
            while (true) {
                ProcessGroupStatus controllerStatus = eventAccess.getControllerStatus();
                if (controllerStatus.getQueuedCount().intValue() <= 0) {
                    break;
                }
                logger.debug("Offloading queues on node {}, remaining queued count: {}", getNodeId(), controllerStatus.getQueuedCount());
                Thread.sleep(1000L);
            }
            Iterator it2 = findAllConnections.iterator();
            while (it2.hasNext()) {
                ((Connection) it2.next()).getFlowFileQueue().resetOffloadedQueue();
            }
            this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, str));
            this.clusterCoordinator.finishNodeOffload(getNodeId());
            logger.info("Node offloaded due to " + str);
            this.writeLock.unlock();
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    private void handleDisconnectionRequest(DisconnectMessage disconnectMessage) {
        logger.info("Received disconnection request message from cluster coordinator with explanation: " + disconnectMessage.getExplanation());
        disconnect(disconnectMessage.getExplanation());
    }

    private void disconnect(String str) {
        this.writeLock.lock();
        try {
            logger.info("Disconnecting node due to " + str);
            this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, DisconnectionCode.UNKNOWN, str));
            this.controller.setPrimary(false);
            this.controller.stopHeartbeating();
            this.controller.setClustered(false, null);
            this.clusterCoordinator.setConnected(false);
            logger.info("Node disconnected due to " + str);
        } finally {
            this.writeLock.unlock();
        }
    }

    private void loadFromBytes(DataFlow dataFlow, boolean z, BundleUpdateStrategy bundleUpdateStrategy) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
        byte[] flow;
        byte[] authorizerFingerprint;
        Set missingComponents;
        logger.trace("Loading flow from bytes");
        if (dataFlow == null) {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            copyCurrentFlow(byteArrayOutputStream);
            flow = byteArrayOutputStream.toByteArray();
            authorizerFingerprint = getAuthorizerFingerprint();
            missingComponents = new HashSet();
            logger.debug("Loaded Flow from bytes");
        } else {
            flow = dataFlow.getFlow();
            authorizerFingerprint = dataFlow.getAuthorizerFingerprint();
            missingComponents = dataFlow.getMissingComponents();
            logger.debug("Loaded flow from proposed flow");
        }
        DataFlow standardDataFlow = new StandardDataFlow(flow, (byte[]) null, authorizerFingerprint, missingComponents);
        logger.debug("Loading proposed flow into FlowController");
        this.dao.load(this.controller, standardDataFlow, this, bundleUpdateStrategy);
        ProcessGroup rootGroup = this.controller.getFlowManager().getRootGroup();
        if (rootGroup.isEmpty() && !z) {
            throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty");
        }
        for (Template template : loadTemplates()) {
            if (rootGroup.getTemplate(template.getIdentifier()) == null) {
                logger.info("Imported Template '{}' to Root Group", template.getDetails().getName());
                rootGroup.addTemplate(template);
            } else {
                logger.info("Template '{}' was already present in Root Group so will not import from file", template.getDetails().getName());
            }
        }
    }

    public List<Template> loadTemplates() throws IOException {
        File[] listFiles = this.nifiProperties.getTemplateDirectory().toFile().listFiles(file -> {
            String lowerCase = file.getName().toLowerCase();
            return lowerCase.endsWith(".template") || lowerCase.endsWith(".xml");
        });
        if (listFiles == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (File file2 : listFiles) {
            FileInputStream fileInputStream = new FileInputStream(file2);
            try {
                BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
                try {
                    try {
                        TemplateDTO deserialize = TemplateDeserializer.deserialize(bufferedInputStream);
                        if (deserialize.getId() == null) {
                            deserialize.setId(UUID.nameUUIDFromBytes(deserialize.getName().getBytes(StandardCharsets.UTF_8)).toString());
                        }
                        arrayList.add(new Template(deserialize));
                        bufferedInputStream.close();
                        fileInputStream.close();
                    } catch (Exception e) {
                        logger.error("Unable to interpret " + file2 + " as a Template. Skipping file.");
                        bufferedInputStream.close();
                        fileInputStream.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                try {
                    fileInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        return arrayList;
    }

    private ConnectionResponse connect(boolean z, boolean z2, DataFlow dataFlow) throws ConnectionException {
        long tryLaterSeconds;
        this.readLock.lock();
        try {
            logger.info("Connecting Node: " + this.nodeId);
            ConnectionRequest connectionRequest = new ConnectionRequest(this.nodeId, dataFlow);
            ConnectionRequestMessage connectionRequestMessage = new ConnectionRequestMessage();
            connectionRequestMessage.setConnectionRequest(connectionRequest);
            ConnectionResponse connectionResponse = null;
            int i = 0;
            while (true) {
                if (i >= 10 && !z2) {
                    break;
                }
                try {
                    try {
                        connectionResponse = this.senderListener.requestConnection(connectionRequestMessage, this.controller.getLeaderElectionManager().isActiveParticipant("Cluster Coordinator")).getConnectionResponse();
                    } catch (Exception e) {
                        logger.warn("Failed to connect to cluster due to: " + e);
                        if (logger.isDebugEnabled()) {
                            logger.warn("", e);
                        }
                        if (!z) {
                            break;
                        }
                        if (connectionResponse == null) {
                            tryLaterSeconds = 5000;
                        } else {
                            try {
                                tryLaterSeconds = connectionResponse.getTryLaterSeconds();
                            } catch (InterruptedException e2) {
                                Thread.currentThread().interrupt();
                            }
                        }
                        Thread.sleep(tryLaterSeconds);
                    }
                } catch (NoClusterCoordinatorException e3) {
                    logger.warn("There is currently no Cluster Coordinator. This often happens upon restart of NiFi when running an embedded ZooKeeper. Will register this node to become the active Cluster Coordinator and will attempt to connect to cluster again");
                    this.controller.registerForClusterCoordinator(true);
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e4) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (connectionResponse.shouldTryLater()) {
                    logger.info("Requested by cluster coordinator to retry connection in " + connectionResponse.getTryLaterSeconds() + " seconds with explanation: " + connectionResponse.getRejectionReason());
                    try {
                        Thread.sleep(connectionResponse.getTryLaterSeconds() * 1000);
                        i++;
                    } catch (InterruptedException e5) {
                        Thread.currentThread().interrupt();
                    }
                } else if (connectionResponse.getRejectionReason() != null) {
                    logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + connectionResponse.getRejectionReason());
                    connectionResponse = null;
                } else {
                    logger.info("Received successful response from Cluster Coordinator to Connection Request");
                }
            }
            if (connectionResponse == null) {
                return connectionResponse;
            }
            if (connectionResponse.shouldTryLater()) {
                logger.info("Received a 'try again' response from Cluster Coordinator when attempting to connect to cluster with explanation '" + connectionResponse.getRejectionReason() + "'. However, the maximum number of retries have already completed. Will load local flow and connect to the cluster when able.");
                this.readLock.unlock();
                return null;
            }
            try {
                this.controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG).setState(Collections.singletonMap(NODE_UUID, connectionResponse.getNodeIdentifier().getId()), Scope.LOCAL);
            } catch (IOException e6) {
                logger.warn("Received successful response from Cluster Manager but failed to persist state about the Node's Unique Identifier and the Node's Index. This node may be assigned a different UUID when the node is restarted.", e6);
            }
            ConnectionResponse connectionResponse2 = connectionResponse;
            this.readLock.unlock();
            return connectionResponse2;
        } finally {
            this.readLock.unlock();
        }
    }

    private void loadFromConnectionResponse(ConnectionResponse connectionResponse) throws ConnectionException {
        this.writeLock.lock();
        try {
            try {
                try {
                    if (connectionResponse.getNodeConnectionStatuses() != null) {
                        this.clusterCoordinator.resetNodeStatuses((Map) connectionResponse.getNodeConnectionStatuses().stream().collect(Collectors.toMap((v0) -> {
                            return v0.getNodeIdentifier();
                        }, nodeConnectionStatus -> {
                            return nodeConnectionStatus;
                        })));
                    }
                    DataFlow dataFlow = connectionResponse.getDataFlow();
                    if (logger.isTraceEnabled()) {
                        logger.trace("ResponseFlow = " + new String(dataFlow.getFlow(), StandardCharsets.UTF_8));
                    }
                    logger.info("Setting Flow Controller's Node ID: " + this.nodeId);
                    this.nodeId = connectionResponse.getNodeIdentifier();
                    this.controller.setNodeId(this.nodeId);
                    loadFromBytes(dataFlow, true, BundleUpdateStrategy.USE_SPECIFIED_OR_FAIL);
                    this.clusterCoordinator.setLocalNodeIdentifier(this.nodeId);
                    this.clusterCoordinator.setConnected(true);
                    this.revisionManager.reset(connectionResponse.getComponentRevisions().toRevisionSnapshot());
                    this.controller.setClustered(true, connectionResponse.getInstanceId());
                    this.controller.setConnectionStatus(new NodeConnectionStatus(this.nodeId, NodeConnectionState.CONNECTED));
                    initializeController();
                    this.controller.onFlowInitialized(this.autoResumeState);
                    loadSnippets(dataFlow.getSnippets());
                    this.controller.startHeartbeating();
                    this.writeLock.unlock();
                } catch (MissingBundleException e) {
                    throw new MissingBundleException("Failed to connect node to cluster because cluster flow contains bundles that do not exist on the current node", e);
                } catch (UninheritableFlowException e2) {
                    throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX, e2);
                }
            } catch (FlowSynchronizationException e3) {
                throw new FlowSynchronizationException("Failed to connect node to cluster because local flow controller partially updated. Administrator should disconnect node and review flow for corruption.", e3);
            } catch (Exception e4) {
                throw new ConnectionException("Failed to connect node to cluster due to: " + e4, e4);
            } catch (FlowSerializationException e5) {
                throw new ConnectionException("Failed to connect node to cluster because local or cluster flow is malformed.", e5);
            }
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    private void initializeController() throws IOException {
        if (this.firstControllerInitialization) {
            logger.debug("First controller initialization, initializing controller...");
            this.controller.initializeFlow();
            this.firstControllerInitialization = false;
        }
    }

    public void copyCurrentFlow(OutputStream outputStream) throws IOException {
        this.readLock.lock();
        try {
            this.dao.load(outputStream);
        } finally {
            this.readLock.unlock();
        }
    }

    public void copyCurrentFlow(File file) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(file);
        try {
            GZIPOutputStream gZIPOutputStream = new GZIPOutputStream(fileOutputStream, 1);
            try {
                copyCurrentFlow((OutputStream) gZIPOutputStream);
                gZIPOutputStream.close();
                fileOutputStream.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                fileOutputStream.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void loadSnippets(byte[] bArr) {
        if (bArr.length == 0) {
            return;
        }
        SnippetManager snippetManager = this.controller.getSnippetManager();
        snippetManager.clear();
        Iterator<StandardSnippet> it = SnippetManager.parseBytes(bArr).iterator();
        while (it.hasNext()) {
            snippetManager.addSnippet(it.next());
        }
    }
}
