package org.apache.nifi.processors.kafka.pubsub;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.class */
public class InFlightMessageTracker {
    private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap();
    private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap();
    private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures = new ConcurrentHashMap();
    private final Object progressMutex = new Object();
    private final ComponentLog logger;

    /* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker$Counts.class */
    public static class Counts {
        private final AtomicInteger sentCount = new AtomicInteger(0);
        private final AtomicInteger acknowledgedCount = new AtomicInteger(0);

        public void incrementSentCount() {
            this.sentCount.incrementAndGet();
        }

        public void incrementAcknowledgedCount() {
            this.acknowledgedCount.incrementAndGet();
        }

        public int getAcknowledgedCount() {
            return this.acknowledgedCount.get();
        }

        public int getSentCount() {
            return this.sentCount.get();
        }
    }

    public InFlightMessageTracker(ComponentLog componentLog) {
        this.logger = componentLog;
    }

    public void incrementAcknowledgedCount(FlowFile flowFile) {
        this.messageCountsByFlowFile.computeIfAbsent(flowFile, flowFile2 -> {
            return new Counts();
        }).incrementAcknowledgedCount();
        synchronized (this.progressMutex) {
            this.progressMutex.notify();
        }
    }

    public void trackEmpty(FlowFile flowFile) {
        this.messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
    }

    public int getAcknowledgedCount(FlowFile flowFile) {
        Counts counts = this.messageCountsByFlowFile.get(flowFile);
        if (counts == null) {
            return 0;
        }
        return counts.getAcknowledgedCount();
    }

    public void incrementSentCount(FlowFile flowFile) {
        this.messageCountsByFlowFile.computeIfAbsent(flowFile, flowFile2 -> {
            return new Counts();
        }).incrementSentCount();
    }

    public int getSentCount(FlowFile flowFile) {
        Counts counts = this.messageCountsByFlowFile.get(flowFile);
        if (counts == null) {
            return 0;
        }
        return counts.getSentCount();
    }

    public void fail(FlowFile flowFile, Exception exc) {
        this.failures.putIfAbsent(flowFile, exc);
        if (this.encounteredFailures.computeIfAbsent(flowFile, flowFile2 -> {
            return ConcurrentHashMap.newKeySet();
        }).add(exc)) {
            this.logger.error("Failed to send {} to Kafka", new Object[]{flowFile, exc});
        }
        synchronized (this.progressMutex) {
            this.progressMutex.notify();
        }
    }

    public Exception getFailure(FlowFile flowFile) {
        return this.failures.get(flowFile);
    }

    public boolean isFailed(FlowFile flowFile) {
        return getFailure(flowFile) != null;
    }

    public void reset() {
        this.messageCountsByFlowFile.clear();
        this.failures.clear();
        this.encounteredFailures.clear();
    }

    public PublishResult failOutstanding(Exception exc) {
        this.messageCountsByFlowFile.keySet().stream().filter(flowFile -> {
            return !isComplete(flowFile);
        }).filter(flowFile2 -> {
            return !this.failures.containsKey(flowFile2);
        }).forEach(flowFile3 -> {
            this.failures.put(flowFile3, exc);
        });
        return createPublishResult();
    }

    private boolean isComplete(FlowFile flowFile) {
        Counts counts = this.messageCountsByFlowFile.get(flowFile);
        return counts.getAcknowledgedCount() == counts.getSentCount() || this.failures.containsKey(flowFile);
    }

    private boolean isComplete() {
        return this.messageCountsByFlowFile.keySet().stream().allMatch(flowFile -> {
            return isComplete(flowFile);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void awaitCompletion(long j) throws InterruptedException, TimeoutException {
        long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(j);
        while (System.nanoTime() < nanoTime) {
            synchronized (this.progressMutex) {
                if (isComplete()) {
                    return;
                } else {
                    this.progressMutex.wait(j);
                }
            }
        }
        throw new TimeoutException();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PublishResult createPublishResult() {
        return new PublishResult() { // from class: org.apache.nifi.processors.kafka.pubsub.InFlightMessageTracker.1
            @Override // org.apache.nifi.processors.kafka.pubsub.PublishResult
            public boolean isFailure() {
                return !InFlightMessageTracker.this.failures.isEmpty();
            }

            @Override // org.apache.nifi.processors.kafka.pubsub.PublishResult
            public int getSuccessfulMessageCount(FlowFile flowFile) {
                return InFlightMessageTracker.this.getAcknowledgedCount(flowFile);
            }

            @Override // org.apache.nifi.processors.kafka.pubsub.PublishResult
            public Exception getReasonForFailure(FlowFile flowFile) {
                return InFlightMessageTracker.this.getFailure(flowFile);
            }
        };
    }
}
