/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
import org.apache.nifi.kafka.shared.property.KeyEncoding;
import org.apache.nifi.kafka.shared.property.KeyFormat;
import org.apache.nifi.kafka.shared.property.OutputStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_3;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;

public abstract class ConsumerLease
implements Closeable,
ConsumerRebalanceListener {
    private static final RecordField EMPTY_SCHEMA_KEY_RECORD_FIELD = new RecordField("key", RecordFieldType.RECORD.getRecordDataType((RecordSchema)new SimpleRecordSchema(Collections.emptyList())));
    private final Long maxWaitMillis;
    private final Consumer<byte[], byte[]> kafkaConsumer;
    private final ComponentLog logger;
    private final byte[] demarcatorBytes;
    private final String keyEncoding;
    private final String securityProtocol;
    private final String bootstrapServers;
    private final RecordSetWriterFactory writerFactory;
    private final RecordReaderFactory readerFactory;
    private final Charset headerCharacterSet;
    private final Pattern headerNamePattern;
    private final boolean separateByKey;
    private final boolean commitOffsets;
    private final OutputStrategy outputStrategy;
    private final String keyFormat;
    private final RecordReaderFactory keyReaderFactory;
    private boolean poisoned = false;
    private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<BundleInformation, BundleTracker>();
    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<TopicPartition, OffsetAndMetadata>();
    private long leaseStartNanos = -1L;
    private boolean lastPollEmpty = false;
    private int totalMessages = 0;
    private static final RecordField FIELD_TOPIC = new RecordField("topic", RecordFieldType.STRING.getDataType());
    private static final RecordField FIELD_PARTITION = new RecordField("partition", RecordFieldType.INT.getDataType());
    private static final RecordField FIELD_OFFSET = new RecordField("offset", RecordFieldType.LONG.getDataType());
    private static final RecordField FIELD_TIMESTAMP = new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType());
    private static final RecordSchema SCHEMA_WRAPPER = new SimpleRecordSchema(Arrays.asList(FIELD_TOPIC, FIELD_PARTITION, FIELD_OFFSET, FIELD_TIMESTAMP));

    ConsumerLease(Long maxWaitMillis, Consumer<byte[], byte[]> kafkaConsumer, byte[] demarcatorBytes, String keyEncoding, String securityProtocol, String bootstrapServers, RecordReaderFactory readerFactory, RecordSetWriterFactory writerFactory, ComponentLog logger, Charset headerCharacterSet, Pattern headerNamePattern, boolean separateByKey, boolean commitMessageOffsets, OutputStrategy outputStrategy, String keyFormat, RecordReaderFactory keyReaderFactory) {
        this.maxWaitMillis = maxWaitMillis;
        this.kafkaConsumer = kafkaConsumer;
        this.demarcatorBytes = demarcatorBytes;
        this.keyEncoding = keyEncoding;
        this.securityProtocol = securityProtocol;
        this.bootstrapServers = bootstrapServers;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
        this.logger = logger;
        this.headerCharacterSet = headerCharacterSet;
        this.headerNamePattern = headerNamePattern;
        this.separateByKey = separateByKey;
        this.commitOffsets = commitMessageOffsets;
        this.outputStrategy = outputStrategy;
        this.keyFormat = keyFormat;
        this.keyReaderFactory = keyReaderFactory;
    }

    private void resetInternalState() {
        this.bundleMap.clear();
        this.uncommittedOffsetsMap.clear();
        this.leaseStartNanos = -1L;
        this.lastPollEmpty = false;
        this.totalMessages = 0;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
        this.commit();
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        this.logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, this.kafkaConsumer});
    }

    public List<TopicPartition> getAssignedPartitions() {
        return null;
    }

    void poll() {
        try {
            ConsumerRecords records = this.kafkaConsumer.poll(Duration.ofMillis(10L));
            this.lastPollEmpty = records.count() == 0;
            this.processRecords((ConsumerRecords<byte[], byte[]>)records);
        }
        catch (ProcessException pe) {
            throw pe;
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
    }

    void abort() {
        this.rollback(this.kafkaConsumer.assignment());
        ProcessSession session = this.getProcessSession();
        if (session != null) {
            session.rollback();
        }
        this.resetInternalState();
    }

    boolean commit() {
        if (this.uncommittedOffsetsMap.isEmpty()) {
            this.resetInternalState();
            return false;
        }
        if (this.isPoisoned()) {
            this.abort();
            return false;
        }
        try {
            Collection<FlowFile> bundledFlowFiles = this.getBundles();
            if (!bundledFlowFiles.isEmpty()) {
                this.getProcessSession().transfer(bundledFlowFiles, ConsumeKafkaRecord_3.REL_SUCCESS);
                if (this.logger.isDebugEnabled()) {
                    for (FlowFile flowFile : bundledFlowFiles) {
                        String recordCountAttribute = flowFile.getAttribute("record.count");
                        String recordCount = recordCountAttribute == null ? "1" : recordCountAttribute;
                        this.logger.debug("Transferred {} with {} records, max offset of {}", new Object[]{flowFile, recordCount, flowFile.getAttribute("kafka.max.offset")});
                    }
                }
            }
            HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<TopicPartition, OffsetAndMetadata>(this.uncommittedOffsetsMap);
            Set assignedPartitions = this.kafkaConsumer.assignment();
            this.getProcessSession().commitAsync(() -> {
                if (this.commitOffsets) {
                    this.kafkaConsumer.commitSync(offsetsMap);
                }
                this.resetInternalState();
            }, failureCause -> {
                this.logger.error("Failed to commit ProcessSession after consuming records from Kafka. Will rollback Kafka Offsets", failureCause);
                this.resetInternalState();
                this.rollback(assignedPartitions);
            });
            return true;
        }
        catch (IOException ioe) {
            this.poison();
            this.logger.error("Failed to finish writing out FlowFile bundle", (Throwable)ioe);
            throw new ProcessException((Throwable)ioe);
        }
        catch (KafkaException kex) {
            this.poison();
            this.logger.warn("Duplicates are likely as we were able to commit the process session but received an exception from Kafka while committing offsets.");
            throw kex;
        }
        catch (Throwable t) {
            this.poison();
            throw t;
        }
    }

    boolean continuePolling() {
        if (this.lastPollEmpty) {
            return false;
        }
        if (this.leaseStartNanos < 0L) {
            this.leaseStartNanos = System.nanoTime();
        }
        long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos);
        if (this.maxWaitMillis == null || durationMillis > this.maxWaitMillis) {
            return false;
        }
        if (this.bundleMap.size() > 200) {
            return false;
        }
        return this.totalMessages < 1000;
    }

    private void poison() {
        this.poisoned = true;
    }

    boolean isPoisoned() {
        return this.poisoned;
    }

    public void wakeup() {
        this.kafkaConsumer.wakeup();
    }

    @Override
    public void close() {
        this.resetInternalState();
    }

    public abstract ProcessSession getProcessSession();

    public abstract void yield();

    private void processRecords(ConsumerRecords<byte[], byte[]> records) {
        records.partitions().forEach(partition -> {
            List messages = records.records(partition);
            if (!messages.isEmpty()) {
                long maxOffset = messages.stream().mapToLong(ConsumerRecord::offset).max().getAsLong();
                if (this.demarcatorBytes != null) {
                    this.writeDemarcatedData(this.getProcessSession(), messages, (TopicPartition)partition);
                } else if (this.readerFactory != null && this.writerFactory != null) {
                    this.writeRecordData(this.getProcessSession(), messages, (TopicPartition)partition);
                } else {
                    messages.forEach(message -> this.writeData(this.getProcessSession(), (ConsumerRecord<byte[], byte[]>)message, (TopicPartition)partition));
                }
                this.totalMessages += messages.size();
                this.uncommittedOffsetsMap.put((TopicPartition)partition, new OffsetAndMetadata(maxOffset + 1L));
            }
        });
    }

    private static String encodeKafkaKey(byte[] key, String encoding) {
        if (key == null) {
            return null;
        }
        if (KeyEncoding.HEX.getValue().equals(encoding)) {
            return DatatypeConverter.printHexBinary((byte[])key);
        }
        if (KeyEncoding.UTF8.getValue().equals(encoding)) {
            return new String(key, StandardCharsets.UTF_8);
        }
        return null;
    }

    private Collection<FlowFile> getBundles() throws IOException {
        ArrayList<FlowFile> flowFiles = new ArrayList<FlowFile>();
        for (BundleTracker tracker : this.bundleMap.values()) {
            boolean includeBundle = this.processBundle(tracker);
            if (!includeBundle) continue;
            flowFiles.add(tracker.flowFile);
        }
        return flowFiles;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processBundle(BundleTracker bundle) throws IOException {
        RecordSetWriter writer = bundle.recordWriter;
        if (writer != null) {
            WriteResult writeResult;
            try {
                writeResult = writer.finishRecordSet();
            }
            finally {
                writer.close();
            }
            if (writeResult.getRecordCount() == 0) {
                this.getProcessSession().remove(bundle.flowFile);
                return false;
            }
            HashMap<String, String> attributes = new HashMap<String, String>(writeResult.getAttributes());
            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
            bundle.flowFile = this.getProcessSession().putAllAttributes(bundle.flowFile, attributes);
        }
        this.populateAttributes(bundle);
        return true;
    }

    private void writeData(ProcessSession session, ConsumerRecord<byte[], byte[]> record, TopicPartition topicPartition) {
        FlowFile flowFile = session.create();
        BundleTracker tracker = new BundleTracker(record, topicPartition, this.keyEncoding);
        tracker.incrementRecordCount(1L, record.offset(), record.leaderEpoch().orElse(null));
        byte[] value = (byte[])record.value();
        flowFile = value != null ? session.write(flowFile, out -> out.write(value)) : session.putAttribute(flowFile, "kafka.tombstone", Boolean.TRUE.toString());
        flowFile = session.putAllAttributes(flowFile, this.getAttributes(record));
        tracker.updateFlowFile(flowFile);
        this.populateAttributes(tracker);
        session.transfer(tracker.flowFile, ConsumeKafkaRecord_3.REL_SUCCESS);
    }

    private void writeDemarcatedData(ProcessSession session, List<ConsumerRecord<byte[], byte[]>> records, TopicPartition topicPartition) {
        Map<BundleInformation, List<ConsumerRecord>> map = records.stream().collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, this.getAttributes((ConsumerRecord<?, ?>)rec), this.separateByKey ? (byte[])rec.key() : null)));
        for (Map.Entry<BundleInformation, List<ConsumerRecord>> entry : map.entrySet()) {
            boolean demarcateFirstRecord;
            FlowFile flowFile;
            BundleInformation bundleInfo = entry.getKey();
            List<ConsumerRecord> recordList = entry.getValue();
            BundleTracker tracker = this.bundleMap.get(bundleInfo);
            if (tracker == null) {
                tracker = new BundleTracker((ConsumerRecord<byte[], byte[]>)recordList.get(0), topicPartition, this.keyEncoding);
                flowFile = session.create();
                flowFile = session.putAllAttributes(flowFile, bundleInfo.attributes);
                tracker.updateFlowFile(flowFile);
                demarcateFirstRecord = false;
            } else {
                demarcateFirstRecord = true;
            }
            flowFile = tracker.flowFile;
            long maxOffset = recordList.get(0).offset();
            int leaderEpoch = -1;
            for (ConsumerRecord record : recordList) {
                maxOffset = Math.max(maxOffset, record.offset());
                leaderEpoch = Math.max(record.leaderEpoch().orElse(leaderEpoch), leaderEpoch);
            }
            tracker.incrementRecordCount(recordList.size(), maxOffset, leaderEpoch >= 0 ? Integer.valueOf(leaderEpoch) : null);
            flowFile = session.append(flowFile, out -> {
                boolean useDemarcator = demarcateFirstRecord;
                for (ConsumerRecord record : recordList) {
                    byte[] value;
                    if (useDemarcator) {
                        out.write(this.demarcatorBytes);
                    }
                    if ((value = (byte[])record.value()) != null) {
                        out.write((byte[])record.value());
                    }
                    useDemarcator = true;
                }
            });
            tracker.updateFlowFile(flowFile);
            this.bundleMap.put(bundleInfo, tracker);
        }
    }

    private void handleParseFailure(ConsumerRecord<byte[], byte[]> consumerRecord, ProcessSession session, Exception cause) {
        this.handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. Will route message as its own FlowFile to the 'parse.failure' relationship");
    }

    private void handleParseFailure(ConsumerRecord<byte[], byte[]> consumerRecord, ProcessSession session, Exception cause, String message) {
        Map<String, String> attributes = this.getAttributes(consumerRecord);
        attributes.put("kafka.offset", String.valueOf(consumerRecord.offset()));
        attributes.put("kafka.timestamp", String.valueOf(consumerRecord.timestamp()));
        FlowFile failureFlowFile = session.create();
        byte[] value = (byte[])consumerRecord.value();
        if (value != null) {
            failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
        }
        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
        String transitUri = StandardTransitUriProvider.getTransitUri((String)this.securityProtocol, (String)this.bootstrapServers, (String)consumerRecord.topic());
        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
        session.transfer(failureFlowFile, ConsumeKafkaRecord_3.REL_PARSE_FAILURE);
        if (cause == null) {
            this.logger.error(message);
        } else {
            this.logger.error(message, (Throwable)cause);
        }
        session.adjustCounter("Parse Failures", 1L, false);
    }

    protected Map<String, String> getAttributes(ConsumerRecord<?, ?> consumerRecord) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("kafka.partition", String.valueOf(consumerRecord.partition()));
        attributes.put("kafka.topic", consumerRecord.topic());
        if (this.headerNamePattern == null) {
            return attributes;
        }
        for (Header header : consumerRecord.headers()) {
            String attributeName = header.key();
            byte[] attributeValue = header.value();
            if (!this.headerNamePattern.matcher(attributeName).matches() || attributeValue == null) continue;
            attributes.put(attributeName, new String(attributeValue, this.headerCharacterSet));
        }
        return attributes;
    }

    /*
     * Exception decompiling
     */
    private void writeRecordData(ProcessSession session, List<ConsumerRecord<byte[], byte[]>> records, TopicPartition topicPartition) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private RecordSetWriter writeRecord(ProcessSession session, ConsumerRecord<byte[], byte[]> consumerRecord, TopicPartition topicPartition, Record record, Map<String, String> attributes) throws SchemaNotFoundException, IOException {
        RecordSetWriter writer;
        RecordSchema recordSchema = record.getSchema();
        BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, this.separateByKey ? (byte[])consumerRecord.key() : null);
        BundleTracker tracker = this.bundleMap.get(bundleInfo);
        if (tracker == null) {
            RecordSchema writeSchema;
            FlowFile flowFile = session.create();
            flowFile = session.putAllAttributes(flowFile, attributes);
            OutputStream rawOut = session.write(flowFile);
            try {
                writeSchema = this.writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
            }
            catch (Exception e) {
                this.logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", (Throwable)e);
                this.rollback(topicPartition);
                this.yield();
                throw new ProcessException((Throwable)e);
            }
            writer = this.writerFactory.createWriter(this.logger, writeSchema, rawOut, flowFile);
            writer.beginRecordSet();
            tracker = new BundleTracker(consumerRecord, topicPartition, this.keyEncoding, writer);
            tracker.updateFlowFile(flowFile);
            this.bundleMap.put(bundleInfo, tracker);
        } else {
            writer = tracker.recordWriter;
        }
        try {
            writer.write(record);
        }
        catch (RuntimeException re) {
            this.handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. Will route message as its own FlowFile to the 'parse.failure' relationship");
            return writer;
        }
        tracker.incrementRecordCount(1L, consumerRecord.offset(), consumerRecord.leaderEpoch().orElse(null));
        session.adjustCounter("Records Received", 1L, false);
        return writer;
    }

    private MapRecord toWrapperRecord(ConsumerRecord<byte[], byte[]> consumerRecord, Record record) throws IOException, SchemaNotFoundException, MalformedRecordException {
        Tuple<RecordField, Object> tupleKey = this.toWrapperRecordKey(consumerRecord);
        Tuple<RecordField, Object> tupleValue = this.toWrapperRecordValue(record);
        Tuple<RecordField, Object> tupleHeaders = this.toWrapperRecordHeaders(consumerRecord);
        Tuple<RecordField, Object> tupleMetadata = this.toWrapperRecordMetadata(consumerRecord);
        SimpleRecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList((RecordField)tupleKey.getKey(), (RecordField)tupleValue.getKey(), (RecordField)tupleHeaders.getKey(), (RecordField)tupleMetadata.getKey()));
        HashMap<String, Object> recordValues = new HashMap<String, Object>();
        recordValues.put(((RecordField)tupleKey.getKey()).getFieldName(), tupleKey.getValue());
        recordValues.put(((RecordField)tupleValue.getKey()).getFieldName(), tupleValue.getValue());
        recordValues.put(((RecordField)tupleHeaders.getKey()).getFieldName(), tupleHeaders.getValue());
        recordValues.put(((RecordField)tupleMetadata.getKey()).getFieldName(), tupleMetadata.getValue());
        return new MapRecord((RecordSchema)rootRecordSchema, recordValues);
    }

    private Tuple<RecordField, Object> toWrapperRecordKey(ConsumerRecord<byte[], byte[]> consumerRecord) throws IOException, SchemaNotFoundException, MalformedRecordException {
        byte[] key;
        byte[] byArray = key = consumerRecord.key() == null ? new byte[]{} : (byte[])consumerRecord.key();
        if (KeyFormat.RECORD.getValue().equals(this.keyFormat)) {
            if (key.length == 0) {
                return new Tuple((Object)EMPTY_SCHEMA_KEY_RECORD_FIELD, null);
            }
            Map<String, String> attributes = this.getAttributes(consumerRecord);
            try (ByteArrayInputStream is = new ByteArrayInputStream(key);){
                Tuple tuple;
                block14: {
                    RecordReader reader = this.keyReaderFactory.createRecordReader(attributes, (InputStream)is, (long)key.length, this.logger);
                    try {
                        Record record = reader.nextRecord();
                        RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
                        tuple = new Tuple((Object)recordField, (Object)record);
                        if (reader == null) break block14;
                    }
                    catch (Throwable throwable) {
                        if (reader != null) {
                            try {
                                reader.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    reader.close();
                }
                return tuple;
            }
        }
        if (KeyFormat.STRING.getValue().equals(this.keyFormat)) {
            RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
            String keyString = key == null ? null : new String(key, StandardCharsets.UTF_8);
            return new Tuple((Object)recordField, (Object)keyString);
        }
        RecordField recordField = new RecordField("key", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
        return new Tuple((Object)recordField, (Object)ArrayUtils.toObject((byte[])key));
    }

    private Tuple<RecordField, Object> toWrapperRecordValue(Record record) {
        RecordSchema recordSchema = record == null ? null : record.getSchema();
        RecordField recordField = new RecordField("value", RecordFieldType.RECORD.getRecordDataType(recordSchema));
        return new Tuple((Object)recordField, (Object)record);
    }

    private Tuple<RecordField, Object> toWrapperRecordHeaders(ConsumerRecord<byte[], byte[]> consumerRecord) {
        RecordField recordField = new RecordField("headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
        HashMap<String, String> headers = new HashMap<String, String>();
        for (Header header : consumerRecord.headers()) {
            headers.put(header.key(), new String(header.value(), this.headerCharacterSet));
        }
        return new Tuple((Object)recordField, headers);
    }

    private Tuple<RecordField, Object> toWrapperRecordMetadata(ConsumerRecord<byte[], byte[]> consumerRecord) {
        RecordField recordField = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_WRAPPER));
        HashMap<String, Object> metadata = new HashMap<String, Object>();
        metadata.put("topic", consumerRecord.topic());
        metadata.put("partition", consumerRecord.partition());
        metadata.put("offset", consumerRecord.offset());
        metadata.put("timestamp", consumerRecord.timestamp());
        MapRecord record = new MapRecord(SCHEMA_WRAPPER, metadata);
        return new Tuple((Object)recordField, (Object)record);
    }

    private void closeWriter(RecordSetWriter writer) {
        try {
            if (writer != null) {
                writer.close();
            }
        }
        catch (Exception ioe) {
            this.logger.warn("Failed to close Record Writer", (Throwable)ioe);
        }
    }

    private void rollback(TopicPartition topicPartition) {
        this.rollback(Collections.singleton(topicPartition));
    }

    private void rollback(Set<TopicPartition> topicPartitions) {
        try {
            Map metadataMap = this.kafkaConsumer.committed(topicPartitions);
            for (Map.Entry entry : metadataMap.entrySet()) {
                TopicPartition topicPartition = (TopicPartition)entry.getKey();
                OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata)entry.getValue();
                if (offsetAndMetadata == null) {
                    this.kafkaConsumer.seekToBeginning(Collections.singleton(topicPartition));
                    this.logger.info("Rolling back offsets so that {}-{} it is at the beginning", new Object[]{topicPartition.topic(), topicPartition.partition()});
                    continue;
                }
                this.kafkaConsumer.seek(topicPartition, offsetAndMetadata.offset());
                this.logger.info("Rolling back offsets so that {}-{} has offset of {}", new Object[]{topicPartition.topic(), topicPartition.partition(), offsetAndMetadata.offset()});
            }
        }
        catch (Exception rollbackException) {
            this.logger.warn("Attempted to rollback Kafka message offset but was unable to do so", (Throwable)rollbackException);
            this.poison();
        }
    }

    private void populateAttributes(BundleTracker tracker) {
        HashMap<String, String> kafkaAttrs = new HashMap<String, String>();
        kafkaAttrs.put("kafka.offset", String.valueOf(tracker.initialOffset));
        kafkaAttrs.put("kafka.timestamp", String.valueOf(tracker.initialTimestamp));
        kafkaAttrs.put("kafka.max.offset", String.valueOf(tracker.maxOffset));
        if (tracker.leaderEpoch != null) {
            kafkaAttrs.put("kafka.leader.epoch", String.valueOf(tracker.leaderEpoch));
        }
        kafkaAttrs.put("kafka.consumer.id", this.kafkaConsumer.groupMetadata().groupId());
        kafkaAttrs.put("kafka.consumer.offsets.committed", String.valueOf(this.commitOffsets));
        if (tracker.key != null && (tracker.totalRecords == 1L || this.separateByKey) && !this.keyEncoding.equalsIgnoreCase(KeyEncoding.DO_NOT_ADD.getValue())) {
            kafkaAttrs.put("kafka.key", tracker.key);
        }
        if (tracker.totalRecords > 1L) {
            if (tracker.recordWriter == null) {
                kafkaAttrs.put("kafka.count", String.valueOf(tracker.totalRecords));
            } else {
                kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
            }
        }
        FlowFile newFlowFile = this.getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
        long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.leaseStartNanos);
        String transitUri = StandardTransitUriProvider.getTransitUri((String)this.securityProtocol, (String)this.bootstrapServers, (String)tracker.topic);
        this.getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
        tracker.updateFlowFile(newFlowFile);
    }

    private static class BundleInformation {
        private final TopicPartition topicPartition;
        private final RecordSchema schema;
        private final Map<String, String> attributes;
        private final byte[] messageKey;

        public BundleInformation(TopicPartition topicPartition, RecordSchema schema, Map<String, String> attributes, byte[] messageKey) {
            this.topicPartition = topicPartition;
            this.schema = schema;
            this.attributes = attributes;
            this.messageKey = messageKey;
        }

        public int hashCode() {
            return 41 + Objects.hash(this.topicPartition, this.schema, this.attributes) + 37 * Arrays.hashCode(this.messageKey);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof BundleInformation)) {
                return false;
            }
            BundleInformation other = (BundleInformation)obj;
            return Objects.equals(this.topicPartition, other.topicPartition) && Objects.equals(this.schema, other.schema) && Objects.equals(this.attributes, other.attributes) && Arrays.equals(this.messageKey, other.messageKey);
        }
    }

    private static class BundleTracker {
        final long initialOffset;
        final long initialTimestamp;
        final int partition;
        final String topic;
        final String key;
        final RecordSetWriter recordWriter;
        FlowFile flowFile;
        long totalRecords = 0L;
        long maxOffset;
        Integer leaderEpoch;

        private BundleTracker(ConsumerRecord<byte[], byte[]> initialRecord, TopicPartition topicPartition, String keyEncoding) {
            this(initialRecord, topicPartition, keyEncoding, null);
        }

        private BundleTracker(ConsumerRecord<byte[], byte[]> initialRecord, TopicPartition topicPartition, String keyEncoding, RecordSetWriter recordWriter) {
            this.maxOffset = this.initialOffset = initialRecord.offset();
            this.initialTimestamp = initialRecord.timestamp();
            this.partition = topicPartition.partition();
            this.topic = topicPartition.topic();
            this.recordWriter = recordWriter;
            this.key = ConsumerLease.encodeKafkaKey((byte[])initialRecord.key(), keyEncoding);
            this.leaderEpoch = initialRecord.leaderEpoch().orElse(null);
        }

        private void incrementRecordCount(long count, long maxOffset, Integer leaderEpoch) {
            this.totalRecords += count;
            this.maxOffset = Math.max(this.maxOffset, maxOffset);
            if (leaderEpoch != null) {
                this.leaderEpoch = this.leaderEpoch == null ? leaderEpoch : Math.max(this.leaderEpoch, leaderEpoch);
            }
        }

        private void updateFlowFile(FlowFile flowFile) {
            this.flowFile = flowFile;
        }
    }
}

