/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WaitNotifyProtocol {
    private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
    public static final String DEFAULT_COUNT_NAME = "default";
    public static final String CONSUMED_COUNT_NAME = "consumed";
    private static final int MAX_REPLACE_RETRY_COUNT = 5;
    private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final Serializer<String> stringSerializer = (value, output) -> {
        if (value != null) {
            output.write(value.getBytes(StandardCharsets.UTF_8));
        }
    };
    private final Deserializer<String> stringDeserializer = input -> input == null ? null : new String(input, StandardCharsets.UTF_8);
    private final AtomicDistributedMapCacheClient cache;

    public WaitNotifyProtocol(AtomicDistributedMapCacheClient cache) {
        this.cache = cache;
    }

    public Signal notify(String signalId, Map<String, Integer> deltas, Map<String, String> attributes) throws IOException, ConcurrentModificationException {
        for (int i = 0; i < 5; ++i) {
            Signal existingSignal = this.getSignal(signalId);
            Signal signal = existingSignal != null ? existingSignal : new Signal();
            signal.identifier = signalId;
            if (attributes != null) {
                signal.attributes.putAll(attributes);
            }
            deltas.forEach((counterName, delta) -> {
                long count = signal.counts.containsKey(counterName) ? signal.counts.get(counterName) : 0L;
                count = delta == 0 ? 0L : count + (long)delta.intValue();
                signal.counts.put((String)counterName, count);
            });
            if (this.replace(signal)) {
                return signal;
            }
            long waitMillis = 10 * (i + 1);
            logger.info("Waiting for {} ms to retry... {}.{}", new Object[]{waitMillis, signalId, deltas});
            try {
                Thread.sleep(waitMillis);
                continue;
            }
            catch (InterruptedException e) {
                String msg = String.format("Interrupted while waiting for retrying signal [%s] counter [%s].", signalId, deltas);
                throw new ConcurrentModificationException(msg, e);
            }
        }
        String msg = String.format("Failed to update signal [%s] counter [%s] after retrying %d times.", signalId, deltas, 5);
        throw new ConcurrentModificationException(msg);
    }

    public Signal notify(String signalId, String counterName, int delta, Map<String, String> attributes) throws IOException, ConcurrentModificationException {
        HashMap<String, Integer> deltas = new HashMap<String, Integer>();
        deltas.put(counterName, delta);
        return this.notify(signalId, deltas, attributes);
    }

    public Signal getSignal(String signalId) throws IOException, DeserializationException {
        AtomicCacheEntry entry = this.cache.fetch((Object)signalId, stringSerializer, this.stringDeserializer);
        if (entry == null) {
            return null;
        }
        String value = (String)entry.getValue();
        try {
            Signal signal = (Signal)objectMapper.readValue(value, Signal.class);
            signal.identifier = signalId;
            signal.cachedEntry = entry;
            return signal;
        }
        catch (JsonParseException jsonE) {
            try {
                Object attributes = new FlowFileAttributesSerializer().deserialize(value.getBytes(StandardCharsets.UTF_8));
                Signal signal = new Signal();
                signal.identifier = signalId;
                signal.setAttributes((Map<String, String>)attributes);
                signal.getCounts().put(DEFAULT_COUNT_NAME, 1L);
                return signal;
            }
            catch (Exception attrE) {
                String msg = String.format("Cached value for %s was not a serialized Signal nor FlowFileAttributes. Error messages: \"%s\", \"%s\"", signalId, jsonE.getMessage(), attrE.getMessage());
                throw new DeserializationException(msg);
            }
        }
    }

    public void complete(String signalId) throws IOException {
        this.cache.remove((Object)signalId, stringSerializer);
    }

    public boolean replace(Signal signal) throws IOException {
        String signalJson = objectMapper.writeValueAsString((Object)signal);
        if (signal.cachedEntry == null) {
            signal.cachedEntry = new AtomicCacheEntry((Object)signal.identifier, (Object)signalJson, null);
        } else {
            signal.cachedEntry.setValue((Object)signalJson);
        }
        return this.cache.replace(signal.cachedEntry, stringSerializer, stringSerializer);
    }

    public static class Signal {
        private transient String identifier;
        private transient AtomicCacheEntry<String, String, Object> cachedEntry;
        private Map<String, Long> counts = new HashMap<String, Long>();
        private Map<String, String> attributes = new HashMap<String, String>();
        private int releasableCount = 0;

        public Map<String, Long> getCounts() {
            return this.counts;
        }

        public void setCounts(Map<String, Long> counts) {
            this.counts = counts;
        }

        public Map<String, String> getAttributes() {
            return this.attributes;
        }

        public void setAttributes(Map<String, String> attributes) {
            this.attributes = attributes;
        }

        @JsonIgnore
        public long getTotalCount() {
            return this.counts.values().stream().mapToLong(Long::longValue).sum();
        }

        public boolean isTotalCountReached(long targetCount) {
            return this.getTotalCount() >= targetCount;
        }

        public boolean isCountReached(String counterName, long targetCount) {
            return this.getCount(counterName) >= targetCount;
        }

        public long getCount(String counterName) {
            if (counterName == null || counterName.isEmpty()) {
                return this.getTotalCount();
            }
            Long count = this.counts.get(counterName);
            return count != null ? count : 0L;
        }

        public int getReleasableCount() {
            return this.releasableCount;
        }

        public void setReleasableCount(int releasableCount) {
            this.releasableCount = releasableCount;
        }

        public <E> void releaseCandidates(String counterName, long requiredCountForPass, int releasableCandidateCountPerPass, List<E> candidates, Consumer<List<E>> released, Consumer<List<E>> waiting) {
            int candidateSize = candidates.size();
            if (this.releasableCount < candidateSize) {
                long signalCount = this.getCount(counterName);
                this.releasableCount = (int)((long)this.releasableCount + signalCount / requiredCountForPass * (long)releasableCandidateCountPerPass);
                long reducedSignalCount = signalCount % requiredCountForPass;
                if (counterName != null && !counterName.isEmpty()) {
                    this.counts.put(counterName, reducedSignalCount);
                } else {
                    Long consumedCount = this.counts.getOrDefault(WaitNotifyProtocol.CONSUMED_COUNT_NAME, 0L);
                    consumedCount = consumedCount - (signalCount - reducedSignalCount);
                    this.counts.put(WaitNotifyProtocol.CONSUMED_COUNT_NAME, consumedCount);
                }
            }
            int releaseCount = Math.min(this.releasableCount, candidateSize);
            released.accept(candidates.subList(0, releaseCount));
            waiting.accept(candidates.subList(releaseCount, candidateSize));
            this.releasableCount -= releaseCount;
        }
    }
}

