/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.InputStream;
import java.lang.invoke.CallSite;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.kafka.shared.property.OutputStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;

public class ConsumerPool
implements Closeable {
    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
    private final List<String> topics;
    private final Pattern topicPattern;
    private final Map<String, Object> kafkaProperties;
    private final Long maxWaitMillis;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private final boolean honorTransactions;
    private final RecordReaderFactory readerFactory;
    private final RecordSetWriterFactory writerFactory;
    private final Charset headerCharacterSet;
    private final Pattern headerNamePattern;
    private final boolean separateByKey;
    private final int[] partitionsToConsume;
    private final boolean commitOffsets;
    private final OutputStrategy outputStrategy;
    private final String keyFormat;
    private final RecordReaderFactory keyReaderFactory;
    private final AtomicLong consumerCreatedCountRef = new AtomicLong();
    private final AtomicLong consumerClosedCountRef = new AtomicLong();
    private final AtomicLong leasesObtainedCountRef = new AtomicLong();
    private final Queue<List<TopicPartition>> availableTopicPartitions = new LinkedBlockingQueue<List<TopicPartition>>();

    public ConsumerPool(int maxConcurrentLeases, byte[] demarcator, boolean separateByKey, Map<String, Object> kafkaProperties, List<String> topics, Long maxWaitMillis, String keyEncoding, String securityProtocol, String bootstrapServers, ComponentLog logger, boolean honorTransactions, Charset headerCharacterSet, Pattern headerNamePattern, int[] partitionsToConsume, boolean commitOffsets) {
        this.pooledLeases = new LinkedBlockingQueue<SimpleConsumerLease>();
        this.maxWaitMillis = maxWaitMillis;
        this.logger = logger;
        this.demarcatorBytes = demarcator;
        this.keyEncoding = keyEncoding;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
        this.topics = Collections.unmodifiableList(topics);
        this.topicPattern = null;
        this.readerFactory = null;
        this.writerFactory = null;
        this.honorTransactions = honorTransactions;
        this.headerCharacterSet = headerCharacterSet;
        this.headerNamePattern = headerNamePattern;
        this.separateByKey = separateByKey;
        this.partitionsToConsume = partitionsToConsume;
        this.commitOffsets = commitOffsets;
        this.outputStrategy = null;
        this.keyFormat = null;
        this.keyReaderFactory = null;
        this.enqueueAssignedPartitions(partitionsToConsume);
    }

    public ConsumerPool(int maxConcurrentLeases, byte[] demarcator, boolean separateByKey, Map<String, Object> kafkaProperties, Pattern topics, Long maxWaitMillis, String keyEncoding, String securityProtocol, String bootstrapServers, ComponentLog logger, boolean honorTransactions, Charset headerCharacterSet, Pattern headerNamePattern, int[] partitionsToConsume, boolean commitOffsets) {
        this.pooledLeases = new LinkedBlockingQueue<SimpleConsumerLease>();
        this.maxWaitMillis = maxWaitMillis;
        this.logger = logger;
        this.demarcatorBytes = demarcator;
        this.keyEncoding = keyEncoding;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
        this.topics = null;
        this.topicPattern = topics;
        this.readerFactory = null;
        this.writerFactory = null;
        this.honorTransactions = honorTransactions;
        this.headerCharacterSet = headerCharacterSet;
        this.headerNamePattern = headerNamePattern;
        this.separateByKey = separateByKey;
        this.partitionsToConsume = partitionsToConsume;
        this.commitOffsets = commitOffsets;
        this.outputStrategy = null;
        this.keyFormat = null;
        this.keyReaderFactory = null;
        this.enqueueAssignedPartitions(partitionsToConsume);
    }

    public ConsumerPool(int maxConcurrentLeases, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, Map<String, Object> kafkaProperties, Pattern topics, Long maxWaitMillis, String securityProtocol, String bootstrapServers, ComponentLog logger, boolean honorTransactions, Charset headerCharacterSet, Pattern headerNamePattern, boolean separateByKey, String keyEncoding, int[] partitionsToConsume, boolean commitOffsets, OutputStrategy outputStrategy, String keyFormat, RecordReaderFactory keyReaderFactory) {
        this.pooledLeases = new LinkedBlockingQueue<SimpleConsumerLease>();
        this.maxWaitMillis = maxWaitMillis;
        this.logger = logger;
        this.demarcatorBytes = null;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
        this.topics = null;
        this.topicPattern = topics;
        this.honorTransactions = honorTransactions;
        this.headerCharacterSet = headerCharacterSet;
        this.headerNamePattern = headerNamePattern;
        this.separateByKey = separateByKey;
        this.keyEncoding = keyEncoding;
        this.partitionsToConsume = partitionsToConsume;
        this.commitOffsets = commitOffsets;
        this.outputStrategy = outputStrategy;
        this.keyFormat = keyFormat;
        this.keyReaderFactory = keyReaderFactory;
        this.enqueueAssignedPartitions(partitionsToConsume);
    }

    public ConsumerPool(int maxConcurrentLeases, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, Map<String, Object> kafkaProperties, List<String> topics, Long maxWaitMillis, String securityProtocol, String bootstrapServers, ComponentLog logger, boolean honorTransactions, Charset headerCharacterSet, Pattern headerNamePattern, boolean separateByKey, String keyEncoding, int[] partitionsToConsume, boolean commitOffsets, OutputStrategy outputStrategy, String keyFormat, RecordReaderFactory keyReaderFactory) {
        this.pooledLeases = new LinkedBlockingQueue<SimpleConsumerLease>();
        this.maxWaitMillis = maxWaitMillis;
        this.logger = logger;
        this.demarcatorBytes = null;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
        this.topics = topics;
        this.topicPattern = null;
        this.honorTransactions = honorTransactions;
        this.headerCharacterSet = headerCharacterSet;
        this.headerNamePattern = headerNamePattern;
        this.separateByKey = separateByKey;
        this.keyEncoding = keyEncoding;
        this.partitionsToConsume = partitionsToConsume;
        this.commitOffsets = commitOffsets;
        this.outputStrategy = outputStrategy;
        this.keyFormat = keyFormat;
        this.keyReaderFactory = keyReaderFactory;
        this.enqueueAssignedPartitions(partitionsToConsume);
    }

    public int getPartitionCount() {
        if (this.topics == null || this.topics.isEmpty()) {
            return -1;
        }
        int partitionsEachTopic = 0;
        try (Consumer<byte[], byte[]> consumer = this.createKafkaConsumer();){
            for (String topicName : this.topics) {
                List partitionInfos = consumer.partitionsFor(topicName);
                int partitionsThisTopic = partitionInfos.size();
                if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) {
                    throw new IllegalStateException("The specific topic names do not have the same number of partitions");
                }
                partitionsEachTopic = partitionsThisTopic;
            }
        }
        return partitionsEachTopic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<ConfigVerificationResult> verifyConfiguration() {
        ArrayList<ConfigVerificationResult> verificationResults = new ArrayList<ConfigVerificationResult>();
        SimpleConsumerLease lease = (SimpleConsumerLease)this.pooledLeases.poll();
        if (lease == null && (lease = this.createConsumerLease()) == null) {
            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Attempt connection").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not obtain a Lease").build());
            return verificationResults;
        }
        try {
            Consumer<byte[], byte[]> consumer = lease.consumer;
            try {
                consumer.groupMetadata();
            }
            catch (Exception e) {
                this.logger.error("Failed to fetch Consumer Group Metadata in order to verify processor configuration", (Throwable)e);
                verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Attempt connection").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not fetch Consumer Group Metadata: " + e).build());
            }
            try {
                if (this.topicPattern == null) {
                    HashMap<String, Long> messagesToConsumePerTopic = new HashMap<String, Long>();
                    long toConsume = 0L;
                    for (String topicName : this.topics) {
                        List partitionInfos = consumer.partitionsFor(topicName);
                        Set topicPartitions = partitionInfos.stream().map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())).collect(Collectors.toSet());
                        Map endOffsets = consumer.endOffsets(topicPartitions, Duration.ofSeconds(30L));
                        Map beginningOffsets = consumer.beginningOffsets(topicPartitions, Duration.ofSeconds(30L));
                        Map committedOffsets = consumer.committed(topicPartitions, Duration.ofSeconds(30L));
                        for (TopicPartition topicPartition : endOffsets.keySet()) {
                            long beginningOffset;
                            long endOffset = (Long)endOffsets.get(topicPartition);
                            if (endOffset <= (beginningOffset = beginningOffsets.getOrDefault(topicPartition, 0L).longValue())) {
                                messagesToConsumePerTopic.merge(topicPartition.topic(), 0L, Long::sum);
                                continue;
                            }
                            OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)committedOffsets.get(topicPartition);
                            long committedOffset = offsetAndMetadata == null ? 0L : offsetAndMetadata.offset();
                            long currentOffset = Math.max(beginningOffset, committedOffset);
                            long messagesToConsume = endOffset - currentOffset;
                            toConsume += messagesToConsume;
                            messagesToConsumePerTopic.merge(topicPartition.topic(), messagesToConsume, Long::sum);
                        }
                    }
                    verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Check Offsets").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully determined offsets for " + messagesToConsumePerTopic.size() + " topics. Number of messages left to consume per topic: " + messagesToConsumePerTopic).build());
                    this.logger.info("Successfully determined offsets for {} topics. Number of messages left to consume per topic: {}", new Object[]{messagesToConsumePerTopic.size(), messagesToConsumePerTopic});
                    if (this.readerFactory != null) {
                        if (toConsume > 0L) {
                            ConfigVerificationResult checkDataResult = this.checkRecordIsParsable(lease);
                            verificationResults.add(checkDataResult);
                        } else {
                            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("There are no available Records to attempt parsing").build());
                        }
                    }
                } else {
                    verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Offsets").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Cannot determine Topic Offsets because a Topic Wildcard was used instead of an explicit Topic Name").build());
                    if (this.readerFactory != null) {
                        ConfigVerificationResult checkDataResult = this.checkRecordIsParsable(lease);
                        verificationResults.add(checkDataResult);
                    }
                }
            }
            catch (Exception e) {
                this.logger.error("Failed to determine Topic Offsets in order to verify configuration", (Throwable)e);
                verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Offsets").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not fetch Topic Offsets: " + e).build());
                verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Could not determine offsets so will not attempt to fetch records").build());
            }
            ArrayList<ConfigVerificationResult> arrayList = verificationResults;
            return arrayList;
        }
        finally {
            lease.close(true);
        }
    }

    private ConfigVerificationResult checkRecordIsParsable(SimpleConsumerLease consumerLease) {
        ConsumerRecords consumerRecords = consumerLease.consumer.poll(Duration.ofSeconds(30L));
        HashMap<String, Integer> parseFailuresPerTopic = new HashMap<String, Integer>();
        HashMap<String, String> latestParseFailureDescription = new HashMap<String, String>();
        HashMap<String, Integer> recordsPerTopic = new HashMap<String, Integer>();
        for (ConsumerRecord consumerRecord : consumerRecords) {
            Map<String, String> attributes = consumerLease.getAttributes(consumerRecord);
            int numRecords = 0;
            byte[] recordBytes = consumerRecord.value() == null ? new byte[]{} : (byte[])consumerRecord.value();
            try (ByteArrayInputStream in = new ByteArrayInputStream(recordBytes);){
                RecordReader reader = this.readerFactory.createRecordReader(attributes, (InputStream)in, (long)recordBytes.length, this.logger);
                while (reader.nextRecord() != null) {
                    ++numRecords;
                }
            }
            catch (Exception e) {
                parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
                latestParseFailureDescription.put(consumerRecord.topic(), e.toString());
            }
            if (numRecords == 0) {
                parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
                latestParseFailureDescription.put(consumerRecord.topic(), "Received Kafka message but Record Reader produced no Record from it");
                recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
                continue;
            }
            recordsPerTopic.merge(consumerRecord.topic(), numRecords, Integer::sum);
        }
        if (parseFailuresPerTopic.isEmpty()) {
            if (recordsPerTopic.isEmpty()) {
                return new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Received no messages to attempt parsing within the 30 second timeout").build();
            }
            return new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Was able to parse all Records consumed from topics. Number of Records consumed from each topic: " + recordsPerTopic).build();
        }
        HashMap<String, CallSite> failureDescriptions = new HashMap<String, CallSite>();
        for (String topic : recordsPerTopic.keySet()) {
            int records = (Integer)recordsPerTopic.get(topic);
            Integer failures = (Integer)parseFailuresPerTopic.get(topic);
            String failureReason = (String)latestParseFailureDescription.get(topic);
            String description = "Failed to parse " + failures + " out of " + records + " records. Sample failure reason: " + failureReason;
            failureDescriptions.put(topic, (CallSite)((Object)description));
        }
        return new ConfigVerificationResult.Builder().verificationStepName("Parse Records").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("With the configured Record Reader, failed to parse at least one Record. Failures per topic: " + failureDescriptions).build();
    }

    public ConsumerLease obtainConsumer(ProcessSession session, ProcessContext processContext) {
        this.recreateAssignedConsumers();
        SimpleConsumerLease lease = (SimpleConsumerLease)this.pooledLeases.poll();
        if (lease == null && (lease = this.createConsumerLease()) == null) {
            return null;
        }
        lease.setProcessSession(session, processContext);
        this.leasesObtainedCountRef.incrementAndGet();
        return lease;
    }

    private void recreateAssignedConsumers() {
        List<TopicPartition> topicPartitions;
        while ((topicPartitions = this.availableTopicPartitions.poll()) != null) {
            SimpleConsumerLease simpleConsumerLease = this.createConsumerLease(topicPartitions);
            this.pooledLeases.add(simpleConsumerLease);
        }
    }

    private SimpleConsumerLease createConsumerLease() {
        if (this.partitionsToConsume != null) {
            this.logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
            return null;
        }
        Consumer<byte[], byte[]> consumer = this.createKafkaConsumer();
        this.consumerCreatedCountRef.incrementAndGet();
        SimpleConsumerLease lease = new SimpleConsumerLease(consumer, null);
        if (this.topics == null) {
            consumer.subscribe(this.topicPattern, (ConsumerRebalanceListener)lease);
        } else {
            consumer.subscribe(this.topics, (ConsumerRebalanceListener)lease);
        }
        return lease;
    }

    private SimpleConsumerLease createConsumerLease(List<TopicPartition> topicPartitions) {
        Consumer<byte[], byte[]> consumer = this.createKafkaConsumer();
        this.consumerCreatedCountRef.incrementAndGet();
        consumer.assign(topicPartitions);
        SimpleConsumerLease lease = new SimpleConsumerLease(consumer, topicPartitions);
        return lease;
    }

    private void enqueueAssignedPartitions(int[] partitionsToConsume) {
        if (partitionsToConsume == null) {
            return;
        }
        for (int partition : partitionsToConsume) {
            List<TopicPartition> topicPartitions = this.createTopicPartitions(partition);
            this.availableTopicPartitions.offer(topicPartitions);
        }
    }

    private List<TopicPartition> createTopicPartitions(int partition) {
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (String topic : this.topics) {
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            topicPartitions.add(topicPartition);
        }
        return topicPartitions;
    }

    protected Consumer<byte[], byte[]> createKafkaConsumer() {
        HashMap<String, Object> properties = new HashMap<String, Object>(this.kafkaProperties);
        if (this.honorTransactions) {
            properties.put("isolation.level", "read_committed");
        } else {
            properties.put("isolation.level", "read_uncommitted");
        }
        KafkaConsumer consumer = new KafkaConsumer(properties);
        return consumer;
    }

    @Override
    public void close() {
        ArrayList leases = new ArrayList();
        this.pooledLeases.drainTo(leases);
        leases.forEach(lease -> lease.close(true));
    }

    private void closeConsumer(Consumer<?, ?> consumer) {
        this.consumerClosedCountRef.incrementAndGet();
        try {
            consumer.unsubscribe();
        }
        catch (Exception e) {
            this.logger.warn("Failed while unsubscribing " + consumer, (Throwable)e);
        }
        try {
            consumer.close();
        }
        catch (Exception e) {
            this.logger.warn("Failed while closing " + consumer, (Throwable)e);
        }
    }

    PoolStats getPoolStats() {
        return new PoolStats(this.consumerCreatedCountRef.get(), this.consumerClosedCountRef.get(), this.leasesObtainedCountRef.get());
    }

    static final class PoolStats {
        final long consumerCreatedCount;
        final long consumerClosedCount;
        final long leasesObtainedCount;

        PoolStats(long consumerCreatedCount, long consumerClosedCount, long leasesObtainedCount) {
            this.consumerCreatedCount = consumerCreatedCount;
            this.consumerClosedCount = consumerClosedCount;
            this.leasesObtainedCount = leasesObtainedCount;
        }

        public String toString() {
            return "Created Consumers [" + this.consumerCreatedCount + "]\nClosed Consumers  [" + this.consumerClosedCount + "]\nLeases Obtained   [" + this.leasesObtainedCount + "]\n";
        }
    }

    private class SimpleConsumerLease
    extends ConsumerLease {
        private final Consumer<byte[], byte[]> consumer;
        private final List<TopicPartition> assignedPartitions;
        private volatile ProcessSession session;
        private volatile ProcessContext processContext;
        private volatile boolean closedConsumer;

        private SimpleConsumerLease(Consumer<byte[], byte[]> consumer, List<TopicPartition> assignedPartitions) {
            super(ConsumerPool.this.maxWaitMillis, consumer, ConsumerPool.this.demarcatorBytes, ConsumerPool.this.keyEncoding, ConsumerPool.this.securityProtocol, ConsumerPool.this.bootstrapServers, ConsumerPool.this.readerFactory, ConsumerPool.this.writerFactory, ConsumerPool.this.logger, ConsumerPool.this.headerCharacterSet, ConsumerPool.this.headerNamePattern, ConsumerPool.this.separateByKey, ConsumerPool.this.commitOffsets, ConsumerPool.this.outputStrategy, ConsumerPool.this.keyFormat, ConsumerPool.this.keyReaderFactory);
            this.consumer = consumer;
            this.assignedPartitions = assignedPartitions;
        }

        void setProcessSession(ProcessSession session, ProcessContext context) {
            this.session = session;
            this.processContext = context;
        }

        @Override
        public List<TopicPartition> getAssignedPartitions() {
            return this.assignedPartitions;
        }

        @Override
        public void yield() {
            if (this.processContext != null) {
                this.processContext.yield();
            }
        }

        @Override
        public ProcessSession getProcessSession() {
            return this.session;
        }

        @Override
        public void close() {
            super.close();
            this.close(false);
        }

        public void close(boolean forceClose) {
            if (this.closedConsumer) {
                return;
            }
            super.close();
            if (this.session != null) {
                this.session.rollback();
                this.setProcessSession(null, null);
            }
            if (forceClose || this.isPoisoned() || !ConsumerPool.this.pooledLeases.offer(this)) {
                this.closedConsumer = true;
                ConsumerPool.this.closeConsumer(this.consumer);
                if (this.assignedPartitions != null) {
                    ConsumerPool.this.logger.debug("Adding partitions {} back to the pool", new Object[]{this.assignedPartitions});
                    ConsumerPool.this.availableTopicPartitions.offer(this.assignedPartitions);
                }
            }
        }
    }
}

