/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.TestNotify;
import org.apache.nifi.processors.standard.Wait;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestWait {
    private TestRunner runner;
    private TestNotify.MockCacheClient service;

    @BeforeEach
    public void setup() throws InitializationException {
        this.runner = TestRunners.newTestRunner(Wait.class);
        this.service = new TestNotify.MockCacheClient();
        this.runner.addControllerService("service", (ControllerService)this.service);
        this.runner.enableControllerService((ControllerService)this.service);
        this.runner.setProperty(Wait.DISTRIBUTED_CACHE_SERVICE, "service");
    }

    @Test
    public void testWait() throws InitializationException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        MockFlowFile ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        ff.assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
    }

    @Test
    public void testWaitKeepInUpstreamConnection() throws InitializationException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.WAIT_MODE, Wait.WAIT_MODE_KEEP_IN_UPSTREAM);
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertQueueNotEmpty();
        this.runner.clearTransferState();
    }

    @Test
    public void testExpired() throws InitializationException, InterruptedException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        MockFlowFile ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        ff.assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{ff});
        Thread.sleep(101L);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
        ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
        ff.assertAttributeNotExists("wait.start.timestamp");
        this.runner.clearTransferState();
    }

    @Test
    public void testCounterExpired() throws InitializationException, InterruptedException, IOException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "5");
        this.runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "notification-id");
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        MockFlowFile ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        ff.assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{ff});
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        HashMap<String, String> signalAttributes = new HashMap<String, String>();
        signalAttributes.put("signal-attr-1", "signal-attr-1-value");
        signalAttributes.put("signal-attr-2", "signal-attr-2-value");
        protocol.notify("notification-id", "counter-A", 1, signalAttributes);
        protocol.notify("notification-id", "counter-B", 2, signalAttributes);
        Thread.sleep(101L);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
        ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
        ff.assertAttributeNotExists("wait.start.timestamp");
        ff.assertAttributeEquals("wait.counter.total", "3");
        ff.assertAttributeEquals("wait.counter.counter-A", "1");
        ff.assertAttributeEquals("wait.counter.counter-B", "2");
        ff.assertAttributeEquals("signal-attr-1", "signal-attr-1-value");
        ff.assertAttributeEquals("signal-attr-2", "signal-attr-2-value");
        this.runner.clearTransferState();
    }

    @Test
    public void testBadWaitStartTimestamp() {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "1");
        props.put("wait.start.timestamp", "blue bunny");
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
        MockFlowFile ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0);
        ff.assertAttributeNotExists("wait.start.timestamp");
        this.runner.clearTransferState();
    }

    @Test
    public void testEmptyReleaseSignal() {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap props = new HashMap();
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
        MockFlowFile ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0);
        ff.assertAttributeNotExists("wait.start.timestamp");
        ff.assertAttributeNotExists("wait.counter.total");
        this.runner.clearTransferState();
    }

    @Test
    public void testFailingCacheService() {
        this.service.setFailOnCalls(true);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "2");
        this.runner.enqueue(new byte[0], props);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> this.runner.run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        Assertions.assertInstanceOf(IOException.class, (Object)((Throwable)((Object)e)).getCause().getCause());
        this.service.setFailOnCalls(false);
    }

    @Test
    public void testWaitPenaltyDuration() {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.WAIT_PENALTY_DURATION, "1 hour");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], props);
        this.runner.run(1, false);
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        MockFlowFile ff = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        ff.assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
        Wait processor = (Wait)this.runner.getProcessor();
        Map signalIdPenalties = processor.getSignalIdPenalties();
        Assertions.assertEquals((int)1, (int)signalIdPenalties.size());
        Assertions.assertTrue((boolean)signalIdPenalties.containsKey("1"));
        this.runner.enqueue(new byte[0], props);
        this.runner.run(1, false);
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 0);
        this.runner.clearTransferState();
    }

    @Test
    public void testReplaceAttributes() throws InitializationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("both", "notifyValue");
        cachedAttributes.put("uuid", "notifyUuid");
        cachedAttributes.put("notify.only", "notifyValue");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        protocol.notify("key", "default", 1, cachedAttributes);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_REPLACE.getValue());
        HashMap<String, String> waitAttributes = new HashMap<String, String>();
        waitAttributes.put("releaseSignalAttribute", "key");
        waitAttributes.put("wait.only", "waitValue");
        waitAttributes.put("both", "waitValue");
        String flowFileContent = "content";
        this.runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
        Assertions.assertNotNull((Object)protocol.getSignal("key"));
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        this.runner.assertTransferCount(Wait.REL_SUCCESS, 1);
        MockFlowFile outputFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        outputFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assertions.assertEquals((Object)"notifyValue", (Object)outputFlowFile.getAttribute("notify.only"));
        Assertions.assertEquals((Object)"waitValue", (Object)outputFlowFile.getAttribute("wait.only"));
        Assertions.assertEquals((Object)"notifyValue", (Object)outputFlowFile.getAttribute("both"));
        this.runner.clearTransferState();
        Assertions.assertNull((Object)protocol.getSignal("key"));
    }

    @Test
    public void testKeepOriginalAttributes() throws InitializationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("both", "notifyValue");
        cachedAttributes.put("uuid", "notifyUuid");
        cachedAttributes.put("notify.only", "notifyValue");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        protocol.notify("key", "default", 1, cachedAttributes);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue());
        HashMap<String, String> waitAttributes = new HashMap<String, String>();
        waitAttributes.put("releaseSignalAttribute", "key");
        waitAttributes.put("wait.only", "waitValue");
        waitAttributes.put("both", "waitValue");
        String flowFileContent = "content";
        this.runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        this.runner.assertTransferCount(Wait.REL_SUCCESS, 1);
        MockFlowFile outputFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        outputFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assertions.assertEquals((Object)"notifyValue", (Object)outputFlowFile.getAttribute("notify.only"));
        Assertions.assertEquals((Object)"waitValue", (Object)outputFlowFile.getAttribute("wait.only"));
        Assertions.assertEquals((Object)"waitValue", (Object)outputFlowFile.getAttribute("both"));
        this.runner.clearTransferState();
    }

    @Test
    public void testWaitForTotalCount() throws InitializationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("both", "notifyValue");
        cachedAttributes.put("uuid", "notifyUuid");
        cachedAttributes.put("notify.only", "notifyValue");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        protocol.notify("key", "counter-A", 1, cachedAttributes);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        HashMap<String, String> waitAttributes = new HashMap<String, String>();
        waitAttributes.put("releaseSignalAttribute", "key");
        waitAttributes.put("targetSignalCount", "3");
        waitAttributes.put("wait.only", "waitValue");
        waitAttributes.put("both", "waitValue");
        String flowFileContent = "content";
        this.runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        MockFlowFile waitingFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        waitingFlowFile.assertAttributeExists("wait.start.timestamp");
        String initialTimestamp = waitingFlowFile.getAttribute("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{waitingFlowFile});
        protocol.notify("key", "counter-B", 1, cachedAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        waitingFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        waitingFlowFile.assertAttributeExists("wait.start.timestamp");
        waitingFlowFile.assertAttributeEquals("wait.start.timestamp", initialTimestamp);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{waitingFlowFile});
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        waitingFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        waitingFlowFile.assertAttributeExists("wait.start.timestamp");
        waitingFlowFile.assertAttributeEquals("wait.start.timestamp", initialTimestamp);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{waitingFlowFile});
        protocol.notify("key", "counter-C", 1, cachedAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        MockFlowFile outputFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        outputFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assertions.assertEquals((Object)"notifyValue", (Object)outputFlowFile.getAttribute("notify.only"));
        Assertions.assertEquals((Object)"waitValue", (Object)outputFlowFile.getAttribute("wait.only"));
        Assertions.assertEquals((Object)"waitValue", (Object)outputFlowFile.getAttribute("both"));
        this.runner.clearTransferState();
        Assertions.assertNull((Object)protocol.getSignal("key"), (String)"The key no longer exist");
    }

    @Test
    public void testWaitForSpecificCount() throws InitializationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("both", "notifyValue");
        cachedAttributes.put("uuid", "notifyUuid");
        cachedAttributes.put("notify.only", "notifyValue");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        protocol.notify("key", "counter-A", 1, cachedAttributes);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        HashMap<String, String> waitAttributes = new HashMap<String, String>();
        waitAttributes.put("releaseSignalAttribute", "key");
        waitAttributes.put("targetSignalCount", "2");
        waitAttributes.put("signalCounterName", "counter-B");
        waitAttributes.put("wait.only", "waitValue");
        waitAttributes.put("both", "waitValue");
        String flowFileContent = "content";
        this.runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        MockFlowFile waitingFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        waitingFlowFile.assertAttributeExists("wait.start.timestamp");
        String initialTimestamp = waitingFlowFile.getAttribute("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{waitingFlowFile});
        protocol.notify("key", "counter-B", 1, cachedAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        waitingFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        waitingFlowFile.assertAttributeExists("wait.start.timestamp");
        waitingFlowFile.assertAttributeEquals("wait.start.timestamp", initialTimestamp);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{waitingFlowFile});
        protocol.notify("key", "counter-C", 1, cachedAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        waitingFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        waitingFlowFile.assertAttributeExists("wait.start.timestamp");
        waitingFlowFile.assertAttributeEquals("wait.start.timestamp", initialTimestamp);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{waitingFlowFile});
        protocol.notify("key", "counter-B", 1, cachedAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        MockFlowFile outputFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        outputFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assertions.assertEquals((Object)"notifyValue", (Object)outputFlowFile.getAttribute("notify.only"));
        Assertions.assertEquals((Object)"waitValue", (Object)outputFlowFile.getAttribute("wait.only"));
        Assertions.assertEquals((Object)"waitValue", (Object)outputFlowFile.getAttribute("both"));
        outputFlowFile.assertAttributeEquals("wait.counter.total", "4");
        outputFlowFile.assertAttributeEquals("wait.counter.counter-A", "1");
        outputFlowFile.assertAttributeEquals("wait.counter.counter-B", "2");
        outputFlowFile.assertAttributeEquals("wait.counter.counter-C", "1");
        this.runner.clearTransferState();
    }

    @Test
    public void testDecrementCache() throws ConcurrentModificationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("both", "notifyValue");
        cachedAttributes.put("uuid", "notifyUuid");
        cachedAttributes.put("notify.only", "notifyValue");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        protocol.notify("key", "counter", 1, cachedAttributes);
        protocol.notify("key", "counter", 1, cachedAttributes);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1");
        this.runner.assertValid();
        HashMap<String, String> waitAttributes = new HashMap<String, String>();
        waitAttributes.put("releaseSignalAttribute", "key");
        waitAttributes.put("signalCounterName", "counter");
        waitAttributes.put("wait.only", "waitValue");
        waitAttributes.put("both", "waitValue");
        waitAttributes.put("uuid", UUID.randomUUID().toString());
        String flowFileContent = "content";
        this.runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        MockFlowFile outputFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        outputFlowFile.assertAttributeNotExists("wait.start.timestamp");
        outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
        Assertions.assertEquals((Object)"0", (Object)Long.toString(protocol.getSignal("key").getCount("counter")));
        Assertions.assertEquals((Object)"1", (Object)Long.toString(protocol.getSignal("key").getReleasableCount()));
        this.runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        outputFlowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        outputFlowFile.assertAttributeNotExists("wait.start.timestamp");
        outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");
        Assertions.assertNull((Object)protocol.getSignal("key"), (String)"The key no longer exist");
        this.runner.clearTransferState();
    }

    @Test
    public void testWaitBufferCount() throws InitializationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("notified", "notified-value");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
        HashMap<String, String> waitAttributesA = new HashMap<String, String>();
        waitAttributesA.put("releaseSignalAttribute", "key-A");
        waitAttributesA.put("targetSignalCount", "1");
        waitAttributesA.put("signalCounterName", "counter");
        HashMap<String, String> waitAttributesB = new HashMap<String, String>();
        waitAttributesB.put("releaseSignalAttribute", "key-B");
        waitAttributesB.put("targetSignalCount", "3");
        waitAttributesB.put("signalCounterName", "counter");
        HashMap<String, String> waitAttributesBInvalid = new HashMap<String, String>();
        waitAttributesBInvalid.putAll(waitAttributesB);
        waitAttributesBInvalid.remove("releaseSignalAttribute");
        TestIteration testIteration = new TestIteration();
        this.runner.enqueue("1".getBytes(), waitAttributesB);
        this.runner.enqueue("2".getBytes(), waitAttributesA);
        this.runner.enqueue("3".getBytes(), waitAttributesBInvalid);
        this.runner.enqueue("4".getBytes(), waitAttributesA);
        this.runner.enqueue("5".getBytes(), waitAttributesB);
        this.runner.enqueue("6".getBytes(), waitAttributesB);
        testIteration.expectedWaiting.addAll(Arrays.asList("1", "5"));
        testIteration.expectedFailed.add("3");
        testIteration.run();
        protocol.notify("key-B", "counter", 3, cachedAttributes);
        testIteration.expectedReleased.add("6");
        testIteration.expectedWaiting.add("1");
        testIteration.waiting.forEach(f -> this.runner.enqueue(new FlowFile[]{f}));
        testIteration.run();
    }

    @Test
    public void testReleaseMultipleFlowFiles() throws InitializationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("notified", "notified-value");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
        this.runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "${fragmentCount}");
        HashMap<String, String> waitAttributes = new HashMap<String, String>();
        waitAttributes.put("releaseSignalAttribute", "key");
        waitAttributes.put("targetSignalCount", "3");
        waitAttributes.put("signalCounterName", "counter");
        waitAttributes.put("fragmentCount", "6");
        TestIteration testIteration = new TestIteration();
        IntStream.range(1, 7).forEach(i -> this.runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
        testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
        testIteration.run();
        WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
        Assertions.assertNull((Object)signal);
        protocol.notify("key", "counter", 3, cachedAttributes);
        testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
        testIteration.waiting.forEach(f -> this.runner.enqueue(new FlowFile[]{f}));
        testIteration.run();
        signal = protocol.getSignal("key");
        Assertions.assertEquals((long)0L, (long)signal.getCount("count"));
        Assertions.assertEquals((int)4, (int)signal.getReleasableCount());
        testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
        testIteration.waiting.forEach(f -> this.runner.enqueue(new FlowFile[]{f}));
        testIteration.run();
        signal = protocol.getSignal("key");
        Assertions.assertEquals((long)0L, (long)signal.getCount("count"));
        Assertions.assertEquals((int)2, (int)signal.getReleasableCount());
    }

    @Test
    public void testOpenGate() throws InitializationException, IOException {
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("notified", "notified-value");
        WaitNotifyProtocol protocol = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
        this.runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "0");
        HashMap<String, String> waitAttributes = new HashMap<String, String>();
        waitAttributes.put("releaseSignalAttribute", "key");
        waitAttributes.put("targetSignalCount", "3");
        waitAttributes.put("signalCounterName", "counter");
        TestIteration testIteration = new TestIteration();
        IntStream.range(1, 7).forEach(i -> this.runner.enqueue(String.valueOf(i).getBytes(), waitAttributes));
        testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
        testIteration.run();
        WaitNotifyProtocol.Signal signal = protocol.getSignal("key");
        Assertions.assertNull((Object)signal);
        protocol.notify("key", "counter", 3, cachedAttributes);
        testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
        testIteration.waiting.forEach(f -> this.runner.enqueue(new FlowFile[]{f}));
        testIteration.run();
        signal = protocol.getSignal("key");
        Assertions.assertEquals((long)3L, (long)signal.getCount("counter"));
        Assertions.assertEquals((int)0, (int)signal.getReleasableCount());
        testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
        testIteration.waiting.forEach(f -> this.runner.enqueue(new FlowFile[]{f}));
        testIteration.run();
        signal = protocol.getSignal("key");
        Assertions.assertEquals((long)3L, (long)signal.getCount("counter"));
        Assertions.assertEquals((int)0, (int)signal.getReleasableCount());
    }

    private class TestIteration {
        final List<MockFlowFile> released = new ArrayList<MockFlowFile>();
        final List<MockFlowFile> waiting = new ArrayList<MockFlowFile>();
        final List<MockFlowFile> failed = new ArrayList<MockFlowFile>();
        final List<String> expectedReleased = new ArrayList<String>();
        final List<String> expectedWaiting = new ArrayList<String>();
        final List<String> expectedFailed = new ArrayList<String>();

        private TestIteration() {
        }

        void run() {
            this.released.clear();
            this.waiting.clear();
            this.failed.clear();
            TestWait.this.runner.run();
            this.released.addAll(TestWait.this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS));
            this.waiting.addAll(TestWait.this.runner.getFlowFilesForRelationship(Wait.REL_WAIT));
            this.failed.addAll(TestWait.this.runner.getFlowFilesForRelationship(Wait.REL_FAILURE));
            Assertions.assertEquals((int)this.expectedReleased.size(), (int)this.released.size());
            Assertions.assertEquals((int)this.expectedWaiting.size(), (int)this.waiting.size());
            Assertions.assertEquals((int)this.expectedFailed.size(), (int)this.failed.size());
            BiConsumer<List, List> assertContents = (expected, actual) -> {
                for (int i = 0; i < expected.size(); ++i) {
                    ((MockFlowFile)actual.get(i)).assertContentEquals((String)expected.get(i));
                }
            };
            assertContents.accept(this.expectedReleased, this.released);
            assertContents.accept(this.expectedWaiting, this.waiting);
            assertContents.accept(this.expectedFailed, this.failed);
            TestWait.this.runner.clearTransferState();
            this.expectedReleased.clear();
            this.expectedWaiting.clear();
            this.expectedFailed.clear();
        }
    }
}

