/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processors.standard.Notify;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestNotify {
    private TestRunner runner;
    private MockCacheClient service;

    @BeforeEach
    public void setup() throws InitializationException {
        this.runner = TestRunners.newTestRunner(Notify.class);
        this.service = new MockCacheClient();
        this.runner.addControllerService("service", (ControllerService)this.service);
        this.runner.enableControllerService((ControllerService)this.service);
        this.runner.setProperty(Notify.DISTRIBUTED_CACHE_SERVICE, "service");
    }

    @Test
    public void testNotify() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "1");
        props.put("key", "value");
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
        ((MockFlowFile)this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).get(0)).assertAttributeEquals("notified", "true");
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service).getSignal("1");
        Map cachedAttributes = signal.getAttributes();
        Assertions.assertEquals((Object)"value", cachedAttributes.get("key"));
        Assertions.assertTrue((boolean)signal.isTotalCountReached(1L));
    }

    @Test
    public void testNotifyCounters() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        HashMap<String, String> props1 = new HashMap<String, String>();
        props1.put("releaseSignalAttribute", "someDataProcessing");
        props1.put("key", "data1");
        props1.put("status", "success");
        this.runner.enqueue(new byte[0], props1);
        HashMap<String, String> props2 = new HashMap<String, String>();
        props2.put("releaseSignalAttribute", "someDataProcessing");
        props2.put("key", "data2");
        props2.put("status", "success");
        this.runner.enqueue(new byte[0], props2);
        HashMap<String, String> props3 = new HashMap<String, String>();
        props3.put("releaseSignalAttribute", "someDataProcessing");
        props3.put("key", "data3");
        props3.put("status", "failure");
        this.runner.enqueue(new byte[0], props3);
        this.runner.run(3);
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals("notified", "true"));
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service).getSignal("someDataProcessing");
        Map cachedAttributes = signal.getAttributes();
        Assertions.assertEquals((Object)"data3", cachedAttributes.get("key"), (String)"Same attribute key will be overwritten by the latest signal");
        Assertions.assertTrue((boolean)signal.isTotalCountReached(3L));
        Assertions.assertEquals((long)2L, (long)signal.getCount("success"));
        Assertions.assertEquals((long)1L, (long)signal.getCount("failure"));
    }

    @Test
    public void testNotifyCountersBatch() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        this.runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "2");
        HashMap<String, String> props1 = new HashMap<String, String>();
        props1.put("releaseSignalAttribute", "someDataProcessing");
        props1.put("key", "data1");
        props1.put("status", "success");
        this.runner.enqueue(new byte[0], props1);
        HashMap<String, String> props2 = new HashMap<String, String>();
        props2.put("releaseSignalAttribute", "someDataProcessing");
        props2.put("key", "data2");
        props2.put("status", "success");
        this.runner.enqueue(new byte[0], props2);
        HashMap<String, String> props3 = new HashMap<String, String>();
        props3.put("releaseSignalAttribute", "someDataProcessing");
        props3.put("key", "data3");
        props3.put("status", "failure");
        this.runner.enqueue(new byte[0], props3);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals("notified", "true"));
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service).getSignal("someDataProcessing");
        Map cachedAttributes = signal.getAttributes();
        Assertions.assertEquals((Object)"data2", cachedAttributes.get("key"), (String)"Same attribute key will be overwritten by the latest signal");
        Assertions.assertTrue((boolean)signal.isTotalCountReached(2L));
        Assertions.assertEquals((long)2L, (long)signal.getCount("success"));
        Assertions.assertEquals((long)0L, (long)signal.getCount("failure"));
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals("notified", "true"));
        this.runner.clearTransferState();
        signal = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service).getSignal("someDataProcessing");
        cachedAttributes = signal.getAttributes();
        Assertions.assertEquals((Object)"data3", cachedAttributes.get("key"), (String)"Same attribute key will be overwritten by the latest signal");
        Assertions.assertTrue((boolean)signal.isTotalCountReached(3L));
        Assertions.assertEquals((long)2L, (long)signal.getCount("success"));
        Assertions.assertEquals((long)1L, (long)signal.getCount("failure"));
    }

    @Test
    public void testNotifyCountersUsingDelta() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}");
        this.runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10");
        HashMap<String, String> props1 = new HashMap<String, String>();
        props1.put("releaseSignalAttribute", "someDataProcessing");
        props1.put("key", "data1");
        props1.put("status", "success");
        props1.put("record.count", "1024");
        this.runner.enqueue(new byte[0], props1);
        HashMap<String, String> props2 = new HashMap<String, String>();
        props2.put("releaseSignalAttribute", "someDataProcessing");
        props2.put("key", "data2");
        props2.put("status", "success");
        props2.put("record.count", "2048");
        this.runner.enqueue(new byte[0], props2);
        HashMap<String, String> props3 = new HashMap<String, String>();
        props3.put("releaseSignalAttribute", "someDataProcessing");
        props3.put("key", "data3");
        props3.put("status", "failure");
        props3.put("record.count", "512");
        this.runner.enqueue(new byte[0], props3);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals("notified", "true"));
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service).getSignal("someDataProcessing");
        Map cachedAttributes = signal.getAttributes();
        Assertions.assertEquals((Object)"data3", cachedAttributes.get("key"), (String)"Same attribute key will be overwritten by the latest signal");
        Assertions.assertTrue((boolean)signal.isTotalCountReached(3584L));
        Assertions.assertEquals((long)3072L, (long)signal.getCount("success"));
        Assertions.assertEquals((long)512L, (long)signal.getCount("failure"));
    }

    @Test
    public void testIllegalDelta() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}");
        this.runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10");
        HashMap<String, String> props1 = new HashMap<String, String>();
        props1.put("releaseSignalAttribute", "someDataProcessing");
        props1.put("key", "data1");
        props1.put("status", "success");
        props1.put("record.count", "1024");
        this.runner.enqueue(new byte[0], props1);
        HashMap<String, String> props2 = new HashMap<String, String>();
        props2.put("releaseSignalAttribute", "someDataProcessing");
        props2.put("key", "data2");
        props2.put("status", "success");
        props2.put("record.count", "2048 records");
        this.runner.enqueue(new byte[0], props2);
        HashMap<String, String> props3 = new HashMap<String, String>();
        props3.put("releaseSignalAttribute", "someDataProcessing");
        props3.put("key", "data3");
        props3.put("status", "failure");
        props3.put("record.count", "512");
        this.runner.enqueue(new byte[0], props3);
        this.runner.run();
        this.runner.assertTransferCount(Notify.REL_SUCCESS, 2);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(flowFile -> flowFile.assertAttributeEquals("notified", "true"));
        this.runner.assertTransferCount(Notify.REL_FAILURE, 1);
        this.runner.getFlowFilesForRelationship(Notify.REL_FAILURE).forEach(flowFile -> flowFile.assertAttributeEquals("notified", "false"));
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service).getSignal("someDataProcessing");
        Map cachedAttributes = signal.getAttributes();
        Assertions.assertEquals((Object)"data3", cachedAttributes.get("key"), (String)"Same attribute key will be overwritten by the latest signal");
        Assertions.assertTrue((boolean)signal.isTotalCountReached(1536L));
        Assertions.assertEquals((long)1024L, (long)signal.getCount("success"));
        Assertions.assertEquals((long)512L, (long)signal.getCount("failure"));
    }

    @Test
    public void testRegex() throws IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, "key[0-9]*");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "1");
        props.put("key1", "value");
        props.put("other.key1", "value");
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol((AtomicDistributedMapCacheClient)this.service).getSignal("1");
        Map cachedAttributes = signal.getAttributes();
        Assertions.assertEquals((Object)"value", cachedAttributes.get("key1"));
        Assertions.assertNull(cachedAttributes.get("other.key1"));
        Assertions.assertTrue((boolean)signal.isTotalCountReached(1L));
    }

    @Test
    public void testEmptyReleaseSignal() {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap props = new HashMap();
        this.runner.enqueue(new byte[0], props);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_FAILURE, 1);
        this.runner.clearTransferState();
    }

    @Test
    public void testFailingCacheService() {
        this.service.setFailOnCalls(true);
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("releaseSignalAttribute", "2");
        this.runner.enqueue(new byte[0], props);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> this.runner.run()));
        Assertions.assertTrue((boolean)(((Throwable)((Object)e)).getCause() instanceof RuntimeException));
        this.service.setFailOnCalls(false);
    }

    static class MockCacheClient
    extends AbstractControllerService
    implements AtomicDistributedMapCacheClient<Long> {
        private final ConcurrentMap<Object, AtomicCacheEntry<Object, Object, Long>> values = new ConcurrentHashMap<Object, AtomicCacheEntry<Object, Object, Long>>();
        private boolean failOnCalls = false;

        MockCacheClient() {
        }

        void setFailOnCalls(boolean failOnCalls) {
            this.failOnCalls = failOnCalls;
        }

        private void verifyNotFail() throws IOException {
            if (this.failOnCalls) {
                throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
            }
        }

        private void unsupported() throws UnsupportedOperationException {
            throw new UnsupportedOperationException("This method shouldn't be used from Notify processor.");
        }

        public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
            this.unsupported();
            return false;
        }

        public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException {
            this.unsupported();
            return null;
        }

        public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException {
            this.unsupported();
            return false;
        }

        public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
            this.unsupported();
        }

        public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
            this.verifyNotFail();
            AtomicCacheEntry entry = (AtomicCacheEntry)this.values.get(key);
            if (entry == null) {
                return null;
            }
            return (V)valueDeserializer.deserialize(((String)entry.getValue()).getBytes(StandardCharsets.UTF_8));
        }

        public void close() throws IOException {
        }

        public <K> boolean remove(K key, Serializer<K> serializer) throws IOException {
            this.verifyNotFail();
            return this.values.remove(key) != null;
        }

        public long removeByPattern(String regex) throws IOException {
            this.verifyNotFail();
            ArrayList removedRecords = new ArrayList();
            Pattern p = Pattern.compile(regex);
            for (Object key : this.values.keySet()) {
                Matcher m = p.matcher(key.toString());
                if (!m.matches()) continue;
                removedRecords.add(this.values.get(key));
            }
            long numRemoved = removedRecords.size();
            removedRecords.forEach(this.values::remove);
            return numRemoved;
        }

        public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
            this.verifyNotFail();
            return (AtomicCacheEntry)this.values.get(key);
        }

        public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
            this.verifyNotFail();
            Object key = entry.getKey();
            AtomicCacheEntry existing = (AtomicCacheEntry)this.values.get(key);
            if (existing != null && !existing.getRevision().equals(entry.getRevision())) {
                return false;
            }
            this.values.put(key, (AtomicCacheEntry<Object, Object, Long>)new AtomicCacheEntry(key, entry.getValue(), (Object)(entry.getRevision().orElse(0L) + 1L)));
            return true;
        }
    }
}

