/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.standard.MonitorActivity;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestMonitorActivity {
    @Test
    public void testFirstMessage() {
        TestableProcessor processor = new TestableProcessor(1000L);
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
        runner.enqueue(new byte[0]);
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        runner.clearTransferState();
        processor.resetLastSuccessfulTransfer();
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key", "value");
        attributes.put("key1", "value1");
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        MockFlowFile restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        String flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeNotExists("key");
        restoredFlowFile.assertAttributeNotExists("key1");
        runner.clearTransferState();
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
        processor.resetLastSuccessfulTransfer();
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.clearTransferState();
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeNotExists("key");
        restoredFlowFile.assertAttributeNotExists("key1");
    }

    @Test
    public void testReconcileAfterFirstStartWhenLastSuccessIsAlreadySet() throws Exception {
        String lastSuccessInCluster = String.valueOf(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5L));
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(0L));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.getStateManager().setState(Collections.singletonMap("MonitorActivity.latestSuccessTransfer", lastSuccessInCluster), Scope.CLUSTER);
        runner.enqueue("lorem ipsum");
        runner.run(1, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotEquals((Object)lastSuccessInCluster, (Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
    }

    @Test
    public void testReconcileWhenSharedStateIsNotYetSet() throws Exception {
        TestableProcessor processor = new TestableProcessor(0L);
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "5 secs");
        runner.setConnected(false);
        runner.enqueue("lorem ipsum");
        runner.run(1, false, false);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        runner.setConnected(true);
        runner.run(1, false, false);
        long tLocal = processor.getLatestSuccessTransfer();
        long tCluster = this.getLastSuccessFromCluster(runner);
        Assertions.assertEquals((long)tLocal, (long)tCluster);
    }

    @Test
    public void testReconcileAfterReconnectWhenPrimary() throws InterruptedException {
        TestRunner runner = this.getRunnerScopeCluster(new MonitorActivity(), true);
        runner.enqueue("lorem ipsum");
        this.runNext(runner);
        this.assertTransferCountSuccessInactiveRestored(runner, 1, 0);
        runner.setConnected(false);
        runner.enqueue("lorem ipsum");
        this.runNext(runner);
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 0);
        runner.setConnected(true);
        TimeUnit.MILLISECONDS.sleep(500L);
        this.runNext(runner);
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 1);
    }

    @Test
    public void testReconcileAfterReconnectWhenNotPrimary() {
        TestableProcessor processor = new TestableProcessor(1000L);
        TestRunner runner = this.getRunnerScopeCluster(processor, false);
        runner.enqueue("lorem ipsum");
        this.runNext(runner);
        this.assertTransferCountSuccessInactiveRestored(runner, 1, 0);
        runner.setConnected(false);
        runner.enqueue("lorem ipsum");
        this.runNext(runner);
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 0);
        runner.setConnected(true);
        this.runNext(runner);
        this.assertTransferCountSuccessInactiveRestored(runner, 2, 0);
    }

    private void runNext(TestRunner runner) {
        runner.run(1, false, false);
    }

    private TestRunner getRunnerScopeCluster(MonitorActivity processor, boolean isPrimary) {
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(isPrimary);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "10 millis");
        return runner;
    }

    private Long getLastSuccessFromCluster(TestRunner runner) throws IOException {
        return Long.valueOf(runner.getStateManager().getState(Scope.CLUSTER).get("MonitorActivity.latestSuccessTransfer"));
    }

    private void assertTransferCountSuccessInactiveRestored(TestRunner runner, int success, int inactive) {
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, success);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, inactive);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
    }

    @Test
    public void testNoReportingWhenDisconnected() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(5L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "3 minutes");
        runner.setConnected(false);
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.setConnected(true);
        runner.run(1, false);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
    }

    @Test
    public void testFirstMessageWithInherit() {
        TestableProcessor processor = new TestableProcessor(1000L);
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        runner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.enqueue(new byte[0]);
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        MockFlowFile originalFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS).get(0);
        runner.clearTransferState();
        processor.resetLastSuccessfulTransfer();
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key", "value");
        attributes.put("key1", "value1");
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        MockFlowFile restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        String flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeEquals("key", "value");
        restoredFlowFile.assertAttributeEquals("key1", "value1");
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.UUID.key(), originalFlowFile.getAttribute(CoreAttributes.UUID.key()));
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.FILENAME.key(), originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
        Assertions.assertNotEquals((long)restoredFlowFile.getSize(), (long)originalFlowFile.getSize());
        runner.clearTransferState();
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
        processor.resetLastSuccessfulTransfer();
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 1);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.clearTransferState();
        runner.enqueue(new byte[0], attributes);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        restoredFlowFile = (MockFlowFile)runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        flowFileContent = new String(restoredFlowFile.toByteArray());
        Assertions.assertTrue((boolean)Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", flowFileContent));
        restoredFlowFile.assertAttributeEquals("key", "value");
        restoredFlowFile.assertAttributeEquals("key1", "value1");
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.UUID.key(), originalFlowFile.getAttribute(CoreAttributes.UUID.key()));
        restoredFlowFile.assertAttributeNotEquals(CoreAttributes.FILENAME.key(), originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
        Assertions.assertNotEquals((long)restoredFlowFile.getSize(), (long)originalFlowFile.getSize());
    }

    @Timeout(value=5L)
    @Test
    public void testFirstRunNoMessages() throws InterruptedException {
        boolean rerun;
        TestRunner runner = TestRunners.newTestRunner((Processor)new MonitorActivity());
        runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        int threshold = 100;
        do {
            rerun = false;
            runner.setProperty(MonitorActivity.THRESHOLD, threshold + " millis");
            Thread.sleep(1000L);
            runner.run();
            runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
            List inactiveFlowFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
            if (inactiveFlowFiles.size() == 1) {
                threshold += threshold;
                rerun = true;
            } else {
                runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
            }
            runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
            runner.clearTransferState();
        } while (rerun);
    }

    @Test
    public void testClusterMonitorInvalidReportingNode() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.assertNotValid();
    }

    @Test
    public void testClusterMonitorActive() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
        Assertions.assertNull((Object)updatedState.get("key1"));
        Assertions.assertNull((Object)updatedState.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(false);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNull((Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
    }

    @Test
    public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        HashMap<String, String> existingState = new HashMap<String, String>();
        long existingTimestamp = System.currentTimeMillis() - 1000L;
        existingState.put("MonitorActivity.latestSuccessTransfer", String.valueOf(existingTimestamp));
        existingState.put("key1", "value1");
        existingState.put("key2", "value2");
        runner.getStateManager().setState(existingState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertTrue((existingTimestamp < Long.parseLong(postProcessedState.get("MonitorActivity.latestSuccessTransfer")) ? 1 : 0) != 0);
        Assertions.assertNull((Object)postProcessedState.get("key1"));
        Assertions.assertNull((Object)postProcessedState.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.enqueue("Incoming data");
        HashMap<String, String> existingState = new HashMap<String, String>();
        long existingTimestamp = System.currentTimeMillis() + 10000L;
        existingState.put("MonitorActivity.latestSuccessTransfer", String.valueOf(existingTimestamp));
        existingState.put("key1", "value1");
        existingState.put("key2", "value2");
        runner.getStateManager().setState(existingState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), existingState, Scope.CLUSTER);
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap postProcessedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertEquals((Object)String.valueOf(existingTimestamp), (Object)postProcessedState.get("MonitorActivity.latestSuccessTransfer"));
        Assertions.assertEquals((Object)postProcessedState.get("key1"), existingState.get("key1"));
        Assertions.assertEquals((Object)postProcessedState.get("key2"), existingState.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveCopyAttribute() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
    }

    @Test
    public void testClusterMonitorInactivity() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assertions.assertEquals((int)1, (int)inactiveFiles.size());
        MockFlowFile inactiveFile = (MockFlowFile)inactiveFiles.get(0);
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityStartMillis"));
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityDurationMillis"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityFallbackToNodeScope() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(false);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assertions.assertEquals((int)1, (int)inactiveFiles.size());
        MockFlowFile inactiveFile = (MockFlowFile)inactiveFiles.get(0);
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityStartMillis"));
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityDurationMillis"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityOnPrimaryNode() {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List inactiveFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assertions.assertEquals((int)1, (int)inactiveFiles.size());
        MockFlowFile inactiveFile = (MockFlowFile)inactiveFiles.get(0);
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityStartMillis"));
        Assertions.assertNotNull((Object)inactiveFile.getAttribute("inactivityDurationMillis"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityOnNode() {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelf() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)1, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)1, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNotNull((Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
        Assertions.assertEquals((Object)"value1", (Object)updatedState.get("key1"));
        Assertions.assertEquals((Object)"value2", (Object)updatedState.get("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(false);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("key1", "value1");
        attributes.put("key2", "value2");
        runner.enqueue("Incoming data", attributes);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)1, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        StateMap updatedState = runner.getStateManager().getState(Scope.CLUSTER);
        Assertions.assertNull((Object)updatedState.get("MonitorActivity.latestSuccessTransfer"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> clusterState = new HashMap<String, String>();
        clusterState.put("MonitorActivity.latestSuccessTransfer", String.valueOf(System.currentTimeMillis()));
        clusterState.put("key1", "value1");
        clusterState.put("key2", "value2");
        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)0, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception {
        TestableProcessor processor = new TestableProcessor(TimeUnit.MINUTES.toMillis(120L));
        TestRunner runner = TestRunners.newTestRunner((Processor)processor);
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(true);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "1 hour");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        runner.clearTransferState();
        HashMap<String, String> clusterState = new HashMap<String, String>();
        clusterState.put("MonitorActivity.latestSuccessTransfer", String.valueOf(System.currentTimeMillis()));
        clusterState.put("key1", "value1");
        clusterState.put("key2", "value2");
        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
        this.runNext(runner);
        List successFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List activityRestoredFiles = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assertions.assertEquals((int)0, (int)successFiles.size());
        Assertions.assertEquals((int)1, (int)activityRestoredFiles.size());
        Assertions.assertEquals((Object)"value1", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key1"));
        Assertions.assertEquals((Object)"value2", (Object)((MockFlowFile)activityRestoredFiles.get(0)).getAttribute("key2"));
        runner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
        TestRunner runner = TestRunners.newTestRunner((Processor)new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        runner.setIsConfiguredForClustering(true);
        runner.setPrimaryNode(false);
        runner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        runner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        runner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        runner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        runner.run();
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.clearTransferState();
        HashMap<String, String> clusterState = new HashMap<String, String>();
        clusterState.put("MonitorActivity.latestSuccessTransfer", String.valueOf(System.currentTimeMillis()));
        clusterState.put("key1", "value1");
        clusterState.put("key2", "value2");
        runner.getStateManager().setState(clusterState, Scope.CLUSTER);
        runner.getStateManager().replace(runner.getStateManager().getState(Scope.CLUSTER), clusterState, Scope.CLUSTER);
        this.runNext(runner);
        runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        runner.clearTransferState();
    }

    private static class TestableProcessor
    extends MonitorActivity {
        private final long timestampDifference;

        public TestableProcessor(long timestampDifference) {
            this.timestampDifference = timestampDifference;
        }

        public void resetLastSuccessfulTransfer() {
            this.setLastSuccessfulTransfer(System.currentTimeMillis() - this.timestampDifference);
        }
    }
}

