/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.standard.MonitorActivity;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestMonitorActivity {
    @Test
    public void testFirstMessage() throws InterruptedException {
        TestableProcessor processor = new TestableProcessor(1000L);
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
        runner.enqueue(new byte[0]);
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        runner.clearTransferState();
        TimeUnit.MILLISECONDS.sleep(200L);
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key", "value");
        attributes.put("key1", "value1");
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        MockFlowFile restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        String flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeNotExists("key");
        restoredFlowFile.assertAttributeNotExists("key1");
        runner.clearTransferState();
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
        TimeUnit.MILLISECONDS.sleep(200L);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.clearTransferState();
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeNotExists("key");
        restoredFlowFile.assertAttributeNotExists("key1");
    }

    @Test
    public void testFirstMessageWithWaitForActivityTrue() throws InterruptedException {
        TestableProcessor processor = new TestableProcessor(1000L);
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
        runner.setProperty(MonitorActivity.WAIT_FOR_ACTIVITY, "true");
        runner.run(1, false);
        TimeUnit.MILLISECONDS.sleep(200L);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.enqueue(new byte[0]);
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        runner.clearTransferState();
        TimeUnit.MILLISECONDS.sleep(200L);
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key", "value");
        attributes.put("key1", "value1");
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        MockFlowFile restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        restoredFlowFile.assertAttributeNotExists("key");
        restoredFlowFile.assertAttributeNotExists("key1");
        runner.clearTransferState();
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
        TimeUnit.MILLISECONDS.sleep(200L);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.clearTransferState();
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        restoredFlowFile.assertAttributeNotExists("key");
        restoredFlowFile.assertAttributeNotExists("key1");
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.enqueue("lorem ipsum");
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotEquals((Object)lastSuccessInCluster, (Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInput() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.run(1, false);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputButClusterIsActive() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis());
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.run(1, false);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasInactiveLastTime() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
        runner.getStateManager().setState(Collections.singletonMap("LocalFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.LOCAL);
        runner.run(1, false);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        StateMap updatedLocalState = runner.getStateManager().getState(Scope.LOCAL);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedLocalState.get("LocalFlowActivityInfo.lastSuccessfulTransfer"));
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndWasActiveLastTime() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis());
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 minutes");
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
        runner.run(1, false);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedInactiveSinceLastTime() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
        runner.run(1, false);
        StateMap updatedClusterState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedClusterState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        StateMap updatedLocalState = runner.getStateManager().getState(Scope.LOCAL);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedLocalState.get("LocalFlowActivityInfo.lastSuccessfulTransfer"));
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySetAndNoInputAndTurnedActiveSinceLastTime() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis());
        String lastSuccessInLocal = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
        runner.getStateManager().setState(Collections.singletonMap("LocalFlowActivityInfo.lastSuccessfulTransfer", lastSuccessInLocal), Scope.LOCAL);
        runner.run(1, false);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)lastSuccessInCluster, (Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertNull((Object)runner.getStateManager().getState(Scope.LOCAL).get("LocalFlowActivityInfo.lastSuccessfulTransfer"));
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
    }

    @Test
    public void testReconcileWhenSharedStateIsNotYetSet() throws Exception {
        TestableProcessor processor = new TestableProcessor(0L);
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.setConnected(false);
        runner.enqueue("lorem ipsum");
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        runner.setConnected(true);
        this.runNext(runner);
        long tLocal = processor.getLastSuccessfulTransfer();
        long tCluster = this.getLastSuccessFromCluster(runner);
        Assertions.assertEquals((long)tLocal, (long)tCluster);
    }

    @Test
    public void testReconcileAfterReconnectWhenPrimary() throws InterruptedException, IOException {
        TestRunner runner = this.getRunnerScopeCluster(new MonitorActivity(), true);
        MockStateManager stateManager = runner.getStateManager();
        runner.enqueue("lorem ipsum");
        runner.run(1, false);
        String lastSuccessTransferAfterFirstTrigger = stateManager.getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer");
        this.assertTransferCountSuccessInactiveRestored(runner, 1, 0);
        runner.setConnected(false);
        runner.enqueue("lorem ipsum");
        TimeUnit.MILLISECONDS.sleep(500L);
        this.runNext(runner);
        Assertions.assertEquals((Object)lastSuccessTransferAfterFirstTrigger, (Object)stateManager.getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 0);
        runner.setConnected(true);
        TimeUnit.MILLISECONDS.sleep(500L);
        this.runNext(runner);
        Assertions.assertNotEquals((Object)lastSuccessTransferAfterFirstTrigger, (Object)stateManager.getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 1);
    }

    @Test
    public void testReconcileAfterReconnectWhenNotPrimary() throws IOException, InterruptedException {
        TestableProcessor processor = new TestableProcessor(1000L);
        TestRunner runner = this.getRunnerScopeCluster(processor, false);
        MockStateManager stateManager = runner.getStateManager();
        runner.enqueue("lorem ipsum");
        runner.run(1, false);
        String lastSuccessTransferAfterFirstTrigger = stateManager.getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer");
        this.assertTransferCountSuccessInactiveRestored(runner, 1, 0);
        runner.setConnected(false);
        runner.enqueue("lorem ipsum");
        TimeUnit.MILLISECONDS.sleep(500L);
        this.runNext(runner);
        Assertions.assertEquals((Object)lastSuccessTransferAfterFirstTrigger, (Object)stateManager.getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 0);
        runner.setConnected(true);
        this.runNext(runner);
        Assertions.assertNotEquals((Object)lastSuccessTransferAfterFirstTrigger, (Object)stateManager.getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 0);
    }

    private void runNext(TestRunner runner) {
        runner.run(1, false, false);
    }

    private TestRunner getRunnerScopeCluster(MonitorActivity processor, boolean isPrimary) {
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(isPrimary);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "10 millis");
        return runner;
    }

    private Long getLastSuccessFromCluster(TestRunner runner) throws IOException {
        return Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
    }

    private void assertTransferCountSuccessInactiveRestored(TestRunner runner, int success, int inactive) {
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, success);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, inactive);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
    }

    @Test
    public void testNoReportingWhenDisconnected() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(5L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "3 minutes");
        runner.setConnected(false);
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.setConnected(true);
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
    }

    @Test
    public void testFirstMessageWithInherit() throws InterruptedException {
        TestableProcessor processor = new TestableProcessor(1000L);
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.enqueue(new byte[0]);
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        MockFlowFile originalFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS).get(0);
        runner.clearTransferState();
        TimeUnit.MILLISECONDS.sleep(200L);
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key", "value");
        attributes.put("key1", "value1");
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        MockFlowFile restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        String flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeEquals("key", "value");
        restoredFlowFile.assertAttributeEquals("key1", "value1");
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.UUID.key(), originalFlowFile.getAttribute(CoreAttributes.UUID.key()));
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.FILENAME.key(), originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
        Assertions.assertNotEquals((long)restoredFlowFile.getSize(), (long)originalFlowFile.getSize());
        runner.clearTransferState();
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
        TimeUnit.MILLISECONDS.sleep(200L);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.clearTransferState();
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeEquals("key", "value");
        restoredFlowFile.assertAttributeEquals("key1", "value1");
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.UUID.key(), originalFlowFile.getAttribute(CoreAttributes.UUID.key()));
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.FILENAME.key(), originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
        Assertions.assertNotEquals((long)restoredFlowFile.getSize(), (long)originalFlowFile.getSize());
    }

    @Timeout(value=5L)
    @Test
    public void testFirstRunNoMessages() throws InterruptedException {
        boolean rerun;
        TestRunner runner = TestRunners.newTestRunner((Processor)new MonitorActivity());
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        int threshold = 100;
        do {
            rerun = false;
            runner.setProperty(MonitorActivity.THRESHOLD, threshold + " millis");
            TimeUnit.MILLISECONDS.sleep(1000L);
            runner.run();
            runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
            List inactiveFlowFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
            if (inactiveFlowFiles.size() == 1) {
                threshold += threshold;
                rerun = true;
            } else {
                runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
            }
            runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
            runner.clearTransferState();
        } while (rerun);
    }

    @Test
    public void testClusterMonitorActive() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertNull((Object)updatedState.get("key1"));
        Assertions.assertNull((Object)updatedState.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(false);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNull((Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
    }

    @Test
    public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        HashMap<String, String> existingState = new HashMap<String, String>();
        long existingTimestamp = System.currentTimeMillis() - 1000L;
        existingState.put("CommonFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(existingTimestamp));
        existingState.put("key1", "value1");
        existingState.put("key2", "value2");
        runner.getStateManager().setState(existingState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
        long postProcessedTimestamp = Long.parseLong(postProcessedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertTrue((existingTimestamp < postProcessedTimestamp ? 1 : 0) != 0);
        Assertions.assertNull((Object)postProcessedState.get("key1"));
        Assertions.assertNull((Object)postProcessedState.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        HashMap<String, String> existingState = new HashMap<String, String>();
        long existingTimestamp = System.currentTimeMillis() + 10000L;
        existingState.put("CommonFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(existingTimestamp));
        existingState.put("key1", "value1");
        existingState.put("key2", "value2");
        runner.getStateManager().setState(existingState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)String.valueOf(existingTimestamp), (Object)postProcessedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertEquals((Object)postProcessedState.get("key1"), existingState.get("key1"));
        Assertions.assertEquals((Object)postProcessedState.get("key2"), existingState.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveCopyAttribute() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
    }

    @Test
    public void testClusterMonitorInactivity() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assertions.assertEquals((int)1, (int)inactiveFiles.size());
        MockFlowFile inactiveFile = (MockFlowFile)inactiveFiles.get(0);
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityStartMillis"));
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityDurationMillis"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityFallbackToNodeScope() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(false);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assertions.assertEquals((int)1, (int)inactiveFiles.size());
        MockFlowFile inactiveFile = (MockFlowFile)inactiveFiles.get(0);
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityStartMillis"));
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityDurationMillis"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityOnPrimaryNode() {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assertions.assertEquals((int)1, (int)inactiveFiles.size());
        MockFlowFile inactiveFile = (MockFlowFile)inactiveFiles.get(0);
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityStartMillis"));
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityDurationMillis"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityOnNode() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelf() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)1, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)1, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(false);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)1, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNull((Object)updatedState.get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> clusterState = new HashMap<String, String>();
        clusterState.put("CommonFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(System.currentTimeMillis()));
        clusterState.put("key1", "value1");
        clusterState.put("key2", "value2");
        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
        TimeUnit.MILLISECONDS.sleep(3334L);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)0, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "10 sec");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> clusterState = new HashMap<String, String>();
        clusterState.put("CommonFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(System.currentTimeMillis()));
        clusterState.put("key1", "value1");
        clusterState.put("key2", "value2");
        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
        TimeUnit.MILLISECONDS.sleep(3334L);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)0, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.clearTransferState();
        HashMap<String, String> clusterState = new HashMap<String, String>();
        clusterState.put("CommonFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(System.currentTimeMillis()));
        clusterState.put("key1", "value1");
        clusterState.put("key2", "value2");
        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.clearTransferState();
    }

    @Test
    public void testDisconnectedNodeActivatesFlow() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        runner.setConnected(false);
        runner.enqueue("Incoming data");
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
    }

    @Test
    public void testDisconnectedNodeDeactivatesFlowOnlyWhenConnected() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(false);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.clearTransferState();
        runner.setConnected(true);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
    }

    @Test
    public void testLocalStateIsNotDeletedInStandaloneCaseWhenStopped() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
        runner.stop();
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
    }

    @Test
    public void testLocalStateIsNotDeletedInClusteredCaseNodeScopeWhenStopped() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
        runner.stop();
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
    }

    @Test
    public void testLocalStateIsNotDeletedInClusteredCaseClusterScopeWhenStopped() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
        runner.stop();
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
    }

    @Test
    public void testLocalStateIsNotDeletedInClusteredCaseWhenDisconnectedAndStopped() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 min");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
        runner.setConnected(false);
        runner.stop();
        Assertions.assertFalse((boolean)runner.getStateManager().getState(Scope.LOCAL).toMap().isEmpty());
    }

    @Test
    public void testActivationMarkerIsImmediateWhenAnyOtherNodeActivatesTheFlow() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.DAYS.toMillis(1L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(System.currentTimeMillis())), Scope.CLUSTER);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_ACTIVITY_RESTORED);
    }

    @Test
    public void testDisconnectNodeAndActivateBothTheOtherNodesAndTheDisconnectedNodeIndependently() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.DAYS.toMillis(1L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        runner.setConnected(false);
        runner.enqueue("Incoming data");
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.clearTransferState();
        runner.getStateManager().setState(Collections.singletonMap("CommonFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(System.currentTimeMillis())), Scope.CLUSTER);
        runner.setConnected(true);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
    }

    @Test
    public void testClusterStateIsImmediatelyUpdatedOnActivation() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.DAYS.toMillis(1L)));
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_ALL);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        Assertions.assertNull((Object)runner.getStateManager().getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
        runner.enqueue("Incoming data");
        this.runNext(runner);
        Assertions.assertNotNull((Object)runner.getStateManager().getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer"));
    }

    @Test
    public void testResetStateOnStartupByDefault() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MonitorActivity());
        runner.setIsConfiguredForClustering(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
        runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
        runner.getStateManager().setState(Collections.singletonMap("LocalFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1L))), Scope.LOCAL);
        runner.enqueue("Incoming data");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
    }

    @Test
    public void testResetStateOnStartupDisabled() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new MonitorActivity());
        runner.setIsConfiguredForClustering(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
        runner.setProperty(MonitorActivity.THRESHOLD, "24 hours");
        runner.setProperty(MonitorActivity.RESET_STATE_ON_RESTART, Boolean.FALSE.toString());
        runner.getStateManager().setState(Collections.singletonMap("LocalFlowActivityInfo.lastSuccessfulTransfer", String.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1L))), Scope.LOCAL);
        runner.enqueue("Incoming data");
        runner.run();
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
    }

    @Test
    public void testMultipleFlowFilesActivateTheFlowInSingleTriggerResultsInSingleMarker() throws IOException {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.DAYS.toMillis(1L)));
        runner.setIsConfiguredForClustering(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 hours");
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        runner.enqueue("Incoming data 1");
        runner.enqueue("Incoming data 2");
        runner.enqueue("Incoming data 3");
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 3);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
    }

    @Test
    public void testInfrequentFlowFilesTriggerImmediateSynchronization() throws IOException, InterruptedException {
        long threshold_seconds = 30L;
        long startup_time_seconds = 1L;
        final AtomicLong nowProvider = new AtomicLong(TimeUnit.SECONDS.toMillis(1L));
        TestRunner runner = TestRunners.newTestRunner((Processor)new MonitorActivity(){

            protected long nowMillis() {
                return nowProvider.get();
            }
        });
        runner.setIsConfiguredForClustering(true);
        runner.setConnected(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "30 seconds");
        runner.run(1, false);
        String state_0 = runner.getStateManager().getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer");
        Assertions.assertNull((Object)state_0);
        runner.enqueue("Incoming data 1");
        this.runNext(runner);
        String state_1 = runner.getStateManager().getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer");
        Assertions.assertNotNull((Object)state_1);
        nowProvider.set(TimeUnit.SECONDS.toMillis(22L));
        this.runNext(runner);
        runner.enqueue("Incoming data 2");
        this.runNext(runner);
        String state_2 = runner.getStateManager().getState(Scope.CLUSTER).get("CommonFlowActivityInfo.lastSuccessfulTransfer");
        Assertions.assertNotEquals((Object)state_1, (Object)state_2);
    }

    private static class TestableProcessor
    extends MonitorActivity {
        private final long startupTime;

        public TestableProcessor(long timestampDifference) {
            this.startupTime = System.currentTimeMillis() - timestampDifference;
        }

        protected long getStartupTime() {
            return this.startupTime;
        }
    }
}

