package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.kafka.shared.property.OutputStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;

/* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerPool.class */
public class ConsumerPool implements Closeable {
    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
    private final List<String> topics;
    private final Pattern topicPattern;
    private final Map<String, Object> kafkaProperties;
    private final Long maxWaitMillis;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private final boolean honorTransactions;
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;
    private final Charset headerCharacterSet;
    private final Pattern headerNamePattern;
    private final boolean separateByKey;
    private final int[] partitionsToConsume;
    private final boolean commitOffsets;
    private final OutputStrategy outputStrategy;
    private final String keyFormat;
    private final RecordReaderFactory keyReaderFactory;
    private final AtomicLong consumerCreatedCountRef;
    private final AtomicLong consumerClosedCountRef;
    private final AtomicLong leasesObtainedCountRef;
    private final Queue<List<TopicPartition>> availableTopicPartitions;

    /* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerPool$PoolStats.class */
    static final class PoolStats {
        final long consumerCreatedCount;
        final long consumerClosedCount;
        final long leasesObtainedCount;

        PoolStats(long j, long j2, long j3) {
            this.consumerCreatedCount = j;
            this.consumerClosedCount = j2;
            this.leasesObtainedCount = j3;
        }

        public String toString() {
            long j = this.consumerCreatedCount;
            long j2 = this.consumerClosedCount;
            long j3 = this.leasesObtainedCount;
            return "Created Consumers [" + j + "]\nClosed Consumers  [" + j + "]\nLeases Obtained   [" + j2 + "]\n";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/kafka/pubsub/ConsumerPool$SimpleConsumerLease.class */
    public class SimpleConsumerLease extends ConsumerLease {
        private final Consumer<byte[], byte[]> consumer;
        private final List<TopicPartition> assignedPartitions;
        private volatile ProcessSession session;
        private volatile ProcessContext processContext;
        private volatile boolean closedConsumer;

        private SimpleConsumerLease(Consumer<byte[], byte[]> consumer, List<TopicPartition> list) {
            super(ConsumerPool.this.maxWaitMillis, consumer, ConsumerPool.this.demarcatorBytes, ConsumerPool.this.keyEncoding, ConsumerPool.this.securityProtocol, ConsumerPool.this.bootstrapServers, ConsumerPool.this.readerFactory, ConsumerPool.this.writerFactory, ConsumerPool.this.logger, ConsumerPool.this.headerCharacterSet, ConsumerPool.this.headerNamePattern, ConsumerPool.this.separateByKey, ConsumerPool.this.commitOffsets, ConsumerPool.this.outputStrategy, ConsumerPool.this.keyFormat, ConsumerPool.this.keyReaderFactory);
            this.consumer = consumer;
            this.assignedPartitions = list;
        }

        void setProcessSession(ProcessSession processSession, ProcessContext processContext) {
            this.session = processSession;
            this.processContext = processContext;
        }

        @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease
        public List<TopicPartition> getAssignedPartitions() {
            return this.assignedPartitions;
        }

        @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease
        public void yield() {
            if (this.processContext != null) {
                this.processContext.yield();
            }
        }

        @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease
        public ProcessSession getProcessSession() {
            return this.session;
        }

        @Override // org.apache.nifi.processors.kafka.pubsub.ConsumerLease, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            super.close();
            close(false);
        }

        public void close(boolean z) {
            if (this.closedConsumer) {
                return;
            }
            super.close();
            if (this.session != null) {
                this.session.rollback();
                setProcessSession(null, null);
            }
            if (z || isPoisoned() || !ConsumerPool.this.pooledLeases.offer(this)) {
                this.closedConsumer = true;
                ConsumerPool.this.closeConsumer(this.consumer);
                if (this.assignedPartitions != null) {
                    ConsumerPool.this.logger.debug("Adding partitions {} back to the pool", new Object[]{this.assignedPartitions});
                    ConsumerPool.this.availableTopicPartitions.offer(this.assignedPartitions);
                }
            }
        }
    }

    public ConsumerPool(int i, byte[] bArr, boolean z, Map<String, Object> map, List<String> list, Long l, String str, String str2, String str3, ComponentLog componentLog, boolean z2, Charset charset, Pattern pattern, int[] iArr, boolean z3) {
        this.consumerCreatedCountRef = new AtomicLong();
        this.consumerClosedCountRef = new AtomicLong();
        this.leasesObtainedCountRef = new AtomicLong();
        this.availableTopicPartitions = new LinkedBlockingQueue();
        this.pooledLeases = new LinkedBlockingQueue();
        this.maxWaitMillis = l;
        this.logger = componentLog;
        this.demarcatorBytes = bArr;
        this.keyEncoding = str;
        this.securityProtocol = str2;
        this.bootstrapServers = str3;
        this.kafkaProperties = Collections.unmodifiableMap(map);
        this.topics = Collections.unmodifiableList(list);
        this.topicPattern = null;
        this.readerFactory = null;
        this.writerFactory = null;
        this.honorTransactions = z2;
        this.headerCharacterSet = charset;
        this.headerNamePattern = pattern;
        this.separateByKey = z;
        this.partitionsToConsume = iArr;
        this.commitOffsets = z3;
        this.outputStrategy = null;
        this.keyFormat = null;
        this.keyReaderFactory = null;
        enqueueAssignedPartitions(iArr);
    }

    public ConsumerPool(int i, byte[] bArr, boolean z, Map<String, Object> map, Pattern pattern, Long l, String str, String str2, String str3, ComponentLog componentLog, boolean z2, Charset charset, Pattern pattern2, int[] iArr, boolean z3) {
        this.consumerCreatedCountRef = new AtomicLong();
        this.consumerClosedCountRef = new AtomicLong();
        this.leasesObtainedCountRef = new AtomicLong();
        this.availableTopicPartitions = new LinkedBlockingQueue();
        this.pooledLeases = new LinkedBlockingQueue();
        this.maxWaitMillis = l;
        this.logger = componentLog;
        this.demarcatorBytes = bArr;
        this.keyEncoding = str;
        this.securityProtocol = str2;
        this.bootstrapServers = str3;
        this.kafkaProperties = Collections.unmodifiableMap(map);
        this.topics = null;
        this.topicPattern = pattern;
        this.readerFactory = null;
        this.writerFactory = null;
        this.honorTransactions = z2;
        this.headerCharacterSet = charset;
        this.headerNamePattern = pattern2;
        this.separateByKey = z;
        this.partitionsToConsume = iArr;
        this.commitOffsets = z3;
        this.outputStrategy = null;
        this.keyFormat = null;
        this.keyReaderFactory = null;
        enqueueAssignedPartitions(iArr);
    }

    public ConsumerPool(int i, RecordReaderFactory recordReaderFactory, RecordSetWriterFactory recordSetWriterFactory, Map<String, Object> map, Pattern pattern, Long l, String str, String str2, ComponentLog componentLog, boolean z, Charset charset, Pattern pattern2, boolean z2, String str3, int[] iArr, boolean z3, OutputStrategy outputStrategy, String str4, RecordReaderFactory recordReaderFactory2) {
        this.consumerCreatedCountRef = new AtomicLong();
        this.consumerClosedCountRef = new AtomicLong();
        this.leasesObtainedCountRef = new AtomicLong();
        this.availableTopicPartitions = new LinkedBlockingQueue();
        this.pooledLeases = new LinkedBlockingQueue();
        this.maxWaitMillis = l;
        this.logger = componentLog;
        this.demarcatorBytes = null;
        this.readerFactory = recordReaderFactory;
        this.writerFactory = recordSetWriterFactory;
        this.securityProtocol = str;
        this.bootstrapServers = str2;
        this.kafkaProperties = Collections.unmodifiableMap(map);
        this.topics = null;
        this.topicPattern = pattern;
        this.honorTransactions = z;
        this.headerCharacterSet = charset;
        this.headerNamePattern = pattern2;
        this.separateByKey = z2;
        this.keyEncoding = str3;
        this.partitionsToConsume = iArr;
        this.commitOffsets = z3;
        this.outputStrategy = outputStrategy;
        this.keyFormat = str4;
        this.keyReaderFactory = recordReaderFactory2;
        enqueueAssignedPartitions(iArr);
    }

    public ConsumerPool(int i, RecordReaderFactory recordReaderFactory, RecordSetWriterFactory recordSetWriterFactory, Map<String, Object> map, List<String> list, Long l, String str, String str2, ComponentLog componentLog, boolean z, Charset charset, Pattern pattern, boolean z2, String str3, int[] iArr, boolean z3, OutputStrategy outputStrategy, String str4, RecordReaderFactory recordReaderFactory2) {
        this.consumerCreatedCountRef = new AtomicLong();
        this.consumerClosedCountRef = new AtomicLong();
        this.leasesObtainedCountRef = new AtomicLong();
        this.availableTopicPartitions = new LinkedBlockingQueue();
        this.pooledLeases = new LinkedBlockingQueue();
        this.maxWaitMillis = l;
        this.logger = componentLog;
        this.demarcatorBytes = null;
        this.readerFactory = recordReaderFactory;
        this.writerFactory = recordSetWriterFactory;
        this.securityProtocol = str;
        this.bootstrapServers = str2;
        this.kafkaProperties = Collections.unmodifiableMap(map);
        this.topics = list;
        this.topicPattern = null;
        this.honorTransactions = z;
        this.headerCharacterSet = charset;
        this.headerNamePattern = pattern;
        this.separateByKey = z2;
        this.keyEncoding = str3;
        this.partitionsToConsume = iArr;
        this.commitOffsets = z3;
        this.outputStrategy = outputStrategy;
        this.keyFormat = str4;
        this.keyReaderFactory = recordReaderFactory2;
        enqueueAssignedPartitions(iArr);
    }

    public int getPartitionCount() {
        if (this.topics == null || this.topics.isEmpty()) {
            return -1;
        }
        int i = 0;
        Consumer<byte[], byte[]> createKafkaConsumer = createKafkaConsumer();
        try {
            Iterator<String> it = this.topics.iterator();
            while (it.hasNext()) {
                int size = createKafkaConsumer.partitionsFor(it.next()).size();
                if (i != 0 && size != i) {
                    throw new IllegalStateException("The specific topic names do not have the same number of partitions");
                }
                i = size;
            }
            if (createKafkaConsumer != null) {
                createKafkaConsumer.close();
            }
            return i;
        } catch (Throwable th) {
            if (createKafkaConsumer != null) {
                try {
                    createKafkaConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public List<ConfigVerificationResult> verifyConfiguration() {
        ArrayList arrayList = new ArrayList();
        SimpleConsumerLease poll = this.pooledLeases.poll();
        if (poll == null) {
            poll = createConsumerLease();
            if (poll == null) {
                arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Attempt connection").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not obtain a Lease").build());
                return arrayList;
            }
        }
        try {
            Consumer<byte[], byte[]> consumer = poll.consumer;
            try {
                consumer.groupMetadata();
            } catch (Exception e) {
                this.logger.error("Failed to fetch Consumer Group Metadata in order to verify processor configuration", e);
                arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Attempt connection").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not fetch Consumer Group Metadata: " + e).build());
            }
            try {
                if (this.topicPattern == null) {
                    HashMap hashMap = new HashMap();
                    long j = 0;
                    Iterator<String> it = this.topics.iterator();
                    while (it.hasNext()) {
                        Set set = (Set) consumer.partitionsFor(it.next()).stream().map(partitionInfo -> {
                            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                        }).collect(Collectors.toSet());
                        Map endOffsets = consumer.endOffsets(set, Duration.ofSeconds(30L));
                        Map beginningOffsets = consumer.beginningOffsets(set, Duration.ofSeconds(30L));
                        Map committed = consumer.committed(set, Duration.ofSeconds(30L));
                        for (TopicPartition topicPartition : endOffsets.keySet()) {
                            long longValue = ((Long) endOffsets.get(topicPartition)).longValue();
                            long longValue2 = ((Long) beginningOffsets.getOrDefault(topicPartition, 0L)).longValue();
                            if (longValue <= longValue2) {
                                hashMap.merge(topicPartition.topic(), 0L, (v0, v1) -> {
                                    return Long.sum(v0, v1);
                                });
                            } else {
                                OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) committed.get(topicPartition);
                                long max = longValue - Math.max(longValue2, offsetAndMetadata == null ? 0L : offsetAndMetadata.offset());
                                j += max;
                                hashMap.merge(topicPartition.topic(), Long.valueOf(max), (v0, v1) -> {
                                    return Long.sum(v0, v1);
                                });
                            }
                        }
                    }
                    arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Check Offsets").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully determined offsets for " + hashMap.size() + " topics. Number of messages left to consume per topic: " + hashMap).build());
                    this.logger.info("Successfully determined offsets for {} topics. Number of messages left to consume per topic: {}", new Object[]{Integer.valueOf(hashMap.size()), hashMap});
                    if (this.readerFactory != null) {
                        if (j > 0) {
                            arrayList.add(checkRecordIsParsable(poll));
                        } else {
                            arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("There are no available Records to attempt parsing").build());
                        }
                    }
                } else {
                    arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Offsets").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Cannot determine Topic Offsets because a Topic Wildcard was used instead of an explicit Topic Name").build());
                    if (this.readerFactory != null) {
                        arrayList.add(checkRecordIsParsable(poll));
                    }
                }
            } catch (Exception e2) {
                this.logger.error("Failed to determine Topic Offsets in order to verify configuration", e2);
                arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Offsets").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not fetch Topic Offsets: " + e2).build());
                arrayList.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Could not determine offsets so will not attempt to fetch records").build());
            }
            return arrayList;
        } finally {
            poll.close(true);
        }
    }

    private ConfigVerificationResult checkRecordIsParsable(SimpleConsumerLease simpleConsumerLease) {
        ConsumerRecords poll = simpleConsumerLease.consumer.poll(Duration.ofSeconds(30L));
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord<?, ?> consumerRecord = (ConsumerRecord) it.next();
            Map<String, String> attributes = simpleConsumerLease.getAttributes(consumerRecord);
            int i = 0;
            try {
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(consumerRecord.value() == null ? new byte[0] : (byte[]) consumerRecord.value());
                try {
                    while (this.readerFactory.createRecordReader(attributes, byteArrayInputStream, r17.length, this.logger).nextRecord() != null) {
                        i++;
                    }
                    byteArrayInputStream.close();
                } catch (Throwable th) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                    break;
                }
            } catch (Exception e) {
                hashMap.merge(consumerRecord.topic(), 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
                hashMap2.put(consumerRecord.topic(), e.toString());
            }
            if (i == 0) {
                hashMap.merge(consumerRecord.topic(), 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
                hashMap2.put(consumerRecord.topic(), "Received Kafka message but Record Reader produced no Record from it");
                hashMap3.merge(consumerRecord.topic(), 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
            } else {
                hashMap3.merge(consumerRecord.topic(), Integer.valueOf(i), (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
            }
        }
        if (hashMap.isEmpty()) {
            return hashMap3.isEmpty() ? new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Received no messages to attempt parsing within the 30 second timeout").build() : new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Was able to parse all Records consumed from topics. Number of Records consumed from each topic: " + hashMap3).build();
        }
        HashMap hashMap4 = new HashMap();
        for (String str : hashMap3.keySet()) {
            hashMap4.put(str, "Failed to parse " + ((Integer) hashMap.get(str)) + " out of " + ((Integer) hashMap3.get(str)).intValue() + " records. Sample failure reason: " + ((String) hashMap2.get(str)));
        }
        return new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("With the configured Record Reader, failed to parse at least one Record. Failures per topic: " + hashMap4).build();
    }

    public ConsumerLease obtainConsumer(ProcessSession processSession, ProcessContext processContext) {
        recreateAssignedConsumers();
        SimpleConsumerLease poll = this.pooledLeases.poll();
        if (poll == null) {
            poll = createConsumerLease();
            if (poll == null) {
                return null;
            }
        }
        poll.setProcessSession(processSession, processContext);
        this.leasesObtainedCountRef.incrementAndGet();
        return poll;
    }

    private void recreateAssignedConsumers() {
        while (true) {
            List<TopicPartition> poll = this.availableTopicPartitions.poll();
            if (poll == null) {
                return;
            }
            this.pooledLeases.add(createConsumerLease(poll));
        }
    }

    private SimpleConsumerLease createConsumerLease() {
        if (this.partitionsToConsume != null) {
            this.logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
            return null;
        }
        Consumer<byte[], byte[]> createKafkaConsumer = createKafkaConsumer();
        this.consumerCreatedCountRef.incrementAndGet();
        SimpleConsumerLease simpleConsumerLease = new SimpleConsumerLease(createKafkaConsumer, null);
        if (this.topics == null) {
            createKafkaConsumer.subscribe(this.topicPattern, simpleConsumerLease);
        } else {
            createKafkaConsumer.subscribe(this.topics, simpleConsumerLease);
        }
        return simpleConsumerLease;
    }

    private SimpleConsumerLease createConsumerLease(List<TopicPartition> list) {
        Consumer<byte[], byte[]> createKafkaConsumer = createKafkaConsumer();
        this.consumerCreatedCountRef.incrementAndGet();
        createKafkaConsumer.assign(list);
        return new SimpleConsumerLease(createKafkaConsumer, list);
    }

    private void enqueueAssignedPartitions(int[] iArr) {
        if (iArr == null) {
            return;
        }
        for (int i : iArr) {
            this.availableTopicPartitions.offer(createTopicPartitions(i));
        }
    }

    private List<TopicPartition> createTopicPartitions(int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.topics.iterator();
        while (it.hasNext()) {
            arrayList.add(new TopicPartition(it.next(), i));
        }
        return arrayList;
    }

    protected Consumer<byte[], byte[]> createKafkaConsumer() {
        HashMap hashMap = new HashMap(this.kafkaProperties);
        if (this.honorTransactions) {
            hashMap.put("isolation.level", "read_committed");
        } else {
            hashMap.put("isolation.level", "read_uncommitted");
        }
        return new KafkaConsumer(hashMap);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        ArrayList arrayList = new ArrayList();
        this.pooledLeases.drainTo(arrayList);
        arrayList.forEach(simpleConsumerLease -> {
            simpleConsumerLease.close(true);
        });
    }

    private void closeConsumer(Consumer<?, ?> consumer) {
        this.consumerClosedCountRef.incrementAndGet();
        try {
            consumer.unsubscribe();
        } catch (Exception e) {
            this.logger.warn("Failed while unsubscribing " + consumer, e);
        }
        try {
            consumer.close();
        } catch (Exception e2) {
            this.logger.warn("Failed while closing " + consumer, e2);
        }
    }

    PoolStats getPoolStats() {
        return new PoolStats(this.consumerCreatedCountRef.get(), this.consumerClosedCountRef.get(), this.leasesObtainedCountRef.get());
    }
}
