/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TestWaitNotifyProtocol {
    private final Map<String, AtomicCacheEntry<String, String, Long>> cacheEntries = new HashMap<String, AtomicCacheEntry<String, String, Long>>();
    private final ObjectMapper mapper = new ObjectMapper();
    private AtomicDistributedMapCacheClient<Long> cache;
    private final Answer<?> successfulReplace = invocation -> {
        AtomicCacheEntry entry = (AtomicCacheEntry)invocation.getArgument(0);
        this.cacheEntries.put((String)entry.getKey(), (AtomicCacheEntry<String, String, Long>)new AtomicCacheEntry((Object)((String)entry.getKey()), (Object)((String)entry.getValue()), (Object)(entry.getRevision().orElse(0L) + 1L)));
        return true;
    };

    @BeforeEach
    public void before() throws Exception {
        this.cacheEntries.clear();
        this.cache = (AtomicDistributedMapCacheClient)Mockito.mock(AtomicDistributedMapCacheClient.class);
        ((AtomicDistributedMapCacheClient)Mockito.doAnswer(invocation -> this.cacheEntries.get(invocation.getArguments()[0])).when(this.cache)).fetch(ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any(), (Deserializer)ArgumentMatchers.any());
    }

    @Test
    public void testNotifyRetryFailure() throws Exception {
        ((AtomicDistributedMapCacheClient)Mockito.doAnswer(invocation -> false).when(this.cache)).replace((AtomicCacheEntry)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any());
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(this.cache);
        String signalId = "signal-id";
        Assertions.assertThrows(ConcurrentModificationException.class, () -> protocol.notify("signal-id", "a", 1, null));
    }

    @Test
    public void testNotifyFirst() throws Exception {
        ((AtomicDistributedMapCacheClient)Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any());
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(this.cache);
        String signalId = "signal-id";
        WaitNotifyProtocol.Signal signal = protocol.notify("signal-id", "a", 1, null);
        Assertions.assertNotNull((Object)signal);
        Assertions.assertEquals((Long)1L, (Long)((Long)signal.getCounts().get("a")));
        Assertions.assertTrue((boolean)this.cacheEntries.containsKey("signal-id"));
        AtomicCacheEntry<String, String, Long> cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)1L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", (String)cacheEntry.getValue());
    }

    @Test
    public void testNotifyCounters() throws Exception {
        ((AtomicDistributedMapCacheClient)Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any());
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(this.cache);
        String signalId = "signal-id";
        protocol.notify("signal-id", "a", 1, null);
        protocol.notify("signal-id", "a", 1, null);
        AtomicCacheEntry<String, String, Long> cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)2L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", (String)cacheEntry.getValue());
        protocol.notify("signal-id", "a", 10, null);
        cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)3L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", (String)cacheEntry.getValue());
        protocol.notify("signal-id", "b", 2, null);
        protocol.notify("signal-id", "c", 3, null);
        cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)5L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", (String)cacheEntry.getValue());
        HashMap<String, Integer> deltas = new HashMap<String, Integer>();
        deltas.put("a", 10);
        deltas.put("b", 25);
        protocol.notify("signal-id", deltas, null);
        cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)6L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", (String)cacheEntry.getValue());
        protocol.notify("signal-id", "b", 0, null);
        cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)7L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", (String)cacheEntry.getValue());
    }

    @Test
    public void testNotifyAttributes() throws Exception {
        ((AtomicDistributedMapCacheClient)Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any());
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(this.cache);
        String signalId = "signal-id";
        HashMap<String, String> attributeA1 = new HashMap<String, String>();
        attributeA1.put("p1", "a1");
        attributeA1.put("p2", "a1");
        protocol.notify("signal-id", "a", 1, attributeA1);
        AtomicCacheEntry<String, String, Long> cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)1L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", (String)cacheEntry.getValue());
        HashMap<String, String> attributeA2 = new HashMap<String, String>();
        attributeA2.put("p2", "a2");
        attributeA2.put("p3", "a2");
        protocol.notify("signal-id", "a", 1, attributeA2);
        cacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals((long)2L, (long)cacheEntry.getRevision().orElse(-1L));
        this.assertValueEquals("{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", (String)cacheEntry.getValue());
    }

    @Test
    public void testSignalCount() throws Exception {
        ((AtomicDistributedMapCacheClient)Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any());
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(this.cache);
        String signalId = "signal-id";
        WaitNotifyProtocol.Signal signal = protocol.getSignal("signal-id");
        Assertions.assertNull((Object)signal, (String)"Should be null since there's no signal yet");
        protocol.notify("signal-id", "success", 1, null);
        signal = protocol.getSignal("signal-id");
        Assertions.assertNotNull((Object)signal);
        Assertions.assertEquals((long)1L, (long)signal.getCount("success"));
        Assertions.assertTrue((boolean)signal.isCountReached("success", 1L));
        Assertions.assertFalse((boolean)signal.isCountReached("success", 2L));
        Assertions.assertTrue((boolean)signal.isTotalCountReached(1L));
        Assertions.assertFalse((boolean)signal.isTotalCountReached(2L));
        protocol.notify("signal-id", "failure", 1, null);
        signal = protocol.getSignal("signal-id");
        Assertions.assertNotNull((Object)signal);
        Assertions.assertEquals((long)1L, (long)signal.getCount("success"));
        Assertions.assertEquals((long)1L, (long)signal.getCount("failure"));
        Assertions.assertTrue((boolean)signal.isCountReached("failure", 1L));
        Assertions.assertFalse((boolean)signal.isCountReached("failure", 2L));
        Assertions.assertTrue((boolean)signal.isTotalCountReached(1L));
        Assertions.assertTrue((boolean)signal.isTotalCountReached(2L));
    }

    @Test
    public void testNiFiVersionUpgrade() throws Exception {
        ((AtomicDistributedMapCacheClient)Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any(), (Serializer)ArgumentMatchers.any());
        FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer();
        HashMap<String, String> cachedAttributes = new HashMap<String, String>();
        cachedAttributes.put("key1", "value1");
        cachedAttributes.put("key2", "value2");
        cachedAttributes.put("key3", "value3");
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        attributesSerializer.serialize(cachedAttributes, (OutputStream)bos);
        String signalId = "old-entry";
        this.cacheEntries.put("old-entry", (AtomicCacheEntry<String, String, Long>)new AtomicCacheEntry((Object)"old-entry", (Object)new String(bos.toByteArray(), StandardCharsets.UTF_8), (Object)0L));
        WaitNotifyProtocol protocol = new WaitNotifyProtocol(this.cache);
        WaitNotifyProtocol.Signal signal = protocol.getSignal("old-entry");
        Assertions.assertEquals((long)1L, (long)signal.getCount("default"));
        Assertions.assertEquals((Object)"value1", signal.getAttributes().get("key1"));
        Assertions.assertEquals((Object)"value2", signal.getAttributes().get("key2"));
        Assertions.assertEquals((Object)"value3", signal.getAttributes().get("key3"));
        this.cacheEntries.put("old-entry", (AtomicCacheEntry<String, String, Long>)new AtomicCacheEntry((Object)"old-entry", (Object)"UNSUPPORTED_FORMAT", (Object)0L));
        Assertions.assertThrows(DeserializationException.class, () -> protocol.getSignal("old-entry"));
    }

    @Test
    public void testReleaseCandidate() throws Exception {
        List candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol.Signal();
        ArrayList released = new ArrayList();
        ArrayList waiting = new ArrayList();
        BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
            released.clear();
            waiting.clear();
            signal.releaseCandidates("default", requiredCountForPass.longValue(), releasableCandidatePerPass.intValue(), candidates, released::addAll, waiting::addAll);
        };
        Field releasableCount = WaitNotifyProtocol.Signal.class.getDeclaredField("releasableCount");
        releasableCount.setAccessible(true);
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)0, (int)released.size());
        Assertions.assertEquals((int)10, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount("default"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("default", 1L);
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)0, (int)released.size());
        Assertions.assertEquals((int)10, (int)waiting.size());
        Assertions.assertEquals((long)1L, (long)signal.getCount("default"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("default", 3L);
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)1, (int)released.size());
        Assertions.assertEquals((int)9, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount("default"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("default", 6L);
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)2, (int)released.size());
        Assertions.assertEquals((int)8, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount("default"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("default", 11L);
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)3, (int)released.size());
        Assertions.assertEquals((int)7, (int)waiting.size());
        Assertions.assertEquals((long)2L, (long)signal.getCount("default"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("default", 6L);
        releaseCandidate.accept(3L, 2);
        Assertions.assertEquals((int)4, (int)released.size());
        Assertions.assertEquals((int)6, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount("default"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("default", 50L);
        releaseCandidate.accept(3L, 2);
        Assertions.assertEquals((int)10, (int)released.size());
        Assertions.assertEquals((int)0, (int)waiting.size());
        Assertions.assertEquals((long)2L, (long)signal.getCount("default"));
        Assertions.assertEquals((long)22L, (long)releasableCount.getLong(signal));
    }

    @Test
    public void testReleaseCandidateTotal() throws Exception {
        List candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol.Signal();
        ArrayList released = new ArrayList();
        ArrayList waiting = new ArrayList();
        String emptyCounterName = null;
        BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
            released.clear();
            waiting.clear();
            signal.releaseCandidates(emptyCounterName, requiredCountForPass.longValue(), releasableCandidatePerPass.intValue(), candidates, released::addAll, waiting::addAll);
        };
        String counterA = "counterA";
        String counterB = "counterB";
        String counterC = "counterC";
        Field releasableCount = WaitNotifyProtocol.Signal.class.getDeclaredField("releasableCount");
        releasableCount.setAccessible(true);
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)0, (int)released.size());
        Assertions.assertEquals((int)10, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount(emptyCounterName));
        Assertions.assertEquals((long)0L, (long)signal.getCount("consumed"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().remove("consumed");
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)0, (int)released.size());
        Assertions.assertEquals((int)10, (int)waiting.size());
        Assertions.assertEquals((long)1L, (long)signal.getCount(emptyCounterName));
        Assertions.assertEquals((long)0L, (long)signal.getCount("consumed"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().put("counterB", 1L);
        signal.getCounts().put("counterC", 1L);
        signal.getCounts().remove("consumed");
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)1, (int)released.size());
        Assertions.assertEquals((int)9, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount(emptyCounterName));
        Assertions.assertEquals((long)-3L, (long)signal.getCount("consumed"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().put("counterB", 2L);
        signal.getCounts().put("counterC", 3L);
        signal.getCounts().remove("consumed");
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)2, (int)released.size());
        Assertions.assertEquals((int)8, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount(emptyCounterName));
        Assertions.assertEquals((long)-6L, (long)signal.getCount("consumed"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("counterA", 3L);
        signal.getCounts().put("counterB", 3L);
        signal.getCounts().put("counterC", 5L);
        signal.getCounts().remove("consumed");
        releaseCandidate.accept(3L, 1);
        Assertions.assertEquals((int)3, (int)released.size());
        Assertions.assertEquals((int)7, (int)waiting.size());
        Assertions.assertEquals((long)2L, (long)signal.getCount(emptyCounterName));
        Assertions.assertEquals((long)-9L, (long)signal.getCount("consumed"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().put("counterB", 2L);
        signal.getCounts().put("counterC", 3L);
        signal.getCounts().remove("consumed");
        releaseCandidate.accept(3L, 2);
        Assertions.assertEquals((int)4, (int)released.size());
        Assertions.assertEquals((int)6, (int)waiting.size());
        Assertions.assertEquals((long)0L, (long)signal.getCount(emptyCounterName));
        Assertions.assertEquals((long)-6L, (long)signal.getCount("consumed"));
        Assertions.assertEquals((long)0L, (long)releasableCount.getLong(signal));
        signal.getCounts().put("counterA", 10L);
        signal.getCounts().put("counterB", 20L);
        signal.getCounts().put("counterC", 20L);
        signal.getCounts().remove("consumed");
        releaseCandidate.accept(3L, 2);
        Assertions.assertEquals((int)10, (int)released.size());
        Assertions.assertEquals((int)0, (int)waiting.size());
        Assertions.assertEquals((long)2L, (long)signal.getCount(emptyCounterName));
        Assertions.assertEquals((long)-48L, (long)signal.getCount("consumed"));
        Assertions.assertEquals((long)22L, (long)releasableCount.getLong(signal));
    }

    public void assertValueEquals(String expected, String value) throws Exception {
        Assertions.assertEquals((Object)this.mapper.readTree(expected), (Object)this.mapper.readTree(value));
    }
}

