/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.InFlightMessageTracker;
import org.apache.nifi.processors.kafka.pubsub.PublishMetadataStrategy;
import org.apache.nifi.processors.kafka.pubsub.PublishResult;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;

public class PublisherLease
implements Closeable {
    private final ComponentLog logger;
    private final Producer<byte[], byte[]> producer;
    private final int maxMessageSize;
    private final long maxAckWaitMillis;
    private final boolean useTransactions;
    private final Pattern attributeNameRegex;
    private final Charset headerCharacterSet;
    private final PublishStrategy publishStrategy;
    private final RecordSetWriterFactory recordKeyWriterFactory;
    private volatile boolean poisoned = false;
    private final AtomicLong messagesSent = new AtomicLong(0L);
    private volatile boolean transactionsInitialized = false;
    private volatile boolean activeTransaction = false;
    private InFlightMessageTracker tracker;
    private static final RecordField FIELD_TOPIC = new RecordField("topic", RecordFieldType.STRING.getDataType());
    private static final RecordField FIELD_PARTITION = new RecordField("partition", RecordFieldType.INT.getDataType());
    private static final RecordField FIELD_TIMESTAMP = new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType());
    private static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(Arrays.asList(FIELD_TOPIC, FIELD_PARTITION, FIELD_TIMESTAMP));
    private static final RecordField FIELD_METADATA = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
    private static final RecordField FIELD_HEADERS = new RecordField("headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));

    public PublisherLease(Producer<byte[], byte[]> producer, int maxMessageSize, long maxAckWaitMillis, ComponentLog logger, boolean useTransactions, Pattern attributeNameRegex, Charset headerCharacterSet, PublishStrategy publishStrategy, RecordSetWriterFactory recordKeyWriterFactory) {
        this.producer = producer;
        this.maxMessageSize = maxMessageSize;
        this.logger = logger;
        this.maxAckWaitMillis = maxAckWaitMillis;
        this.useTransactions = useTransactions;
        this.attributeNameRegex = attributeNameRegex;
        this.headerCharacterSet = headerCharacterSet;
        this.publishStrategy = publishStrategy;
        this.recordKeyWriterFactory = recordKeyWriterFactory;
    }

    protected void poison() {
        this.poisoned = true;
    }

    public boolean isPoisoned() {
        return this.poisoned;
    }

    void beginTransaction() {
        if (!this.useTransactions) {
            return;
        }
        try {
            if (!this.transactionsInitialized) {
                this.producer.initTransactions();
                this.transactionsInitialized = true;
            }
            this.producer.beginTransaction();
            this.activeTransaction = true;
        }
        catch (Exception e) {
            this.poison();
            throw e;
        }
    }

    void rollback() {
        if (!this.useTransactions || !this.activeTransaction) {
            return;
        }
        try {
            this.producer.abortTransaction();
        }
        catch (Exception e) {
            this.poison();
            throw e;
        }
        this.activeTransaction = false;
    }

    void fail(FlowFile flowFile, Exception cause) {
        this.getTracker().fail(flowFile, cause);
        this.rollback();
    }

    void publish(FlowFile flowFile, InputStream flowFileContent, byte[] messageKey, byte[] demarcatorBytes, String topic, Integer partition) throws IOException {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        try {
            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
                if (flowFile.getSize() > (long)this.maxMessageSize) {
                    this.tracker.fail(flowFile, (Exception)((Object)new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxMessageSize + " bytes.")));
                    return;
                }
                byte[] messageContent = new byte[(int)flowFile.getSize()];
                StreamUtils.fillBuffer((InputStream)flowFileContent, (byte[])messageContent);
                this.publish(flowFile, messageKey, messageContent, topic, this.tracker, partition);
                return;
            }
            try (StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, this.maxMessageSize);){
                byte[] messageContent;
                while ((messageContent = demarcator.nextToken()) != null) {
                    this.publish(flowFile, messageKey, messageContent, topic, this.tracker, partition);
                    if (!this.tracker.isFailed(flowFile)) continue;
                    return;
                }
            }
            catch (TokenTooLargeException ttle) {
                this.tracker.fail(flowFile, (Exception)((Object)ttle));
            }
        }
        catch (Exception e) {
            this.tracker.fail(flowFile, e);
            this.poison();
            throw e;
        }
    }

    void publish(FlowFile flowFile, RecordSet recordSet, RecordSetWriterFactory writerFactory, RecordSchema schema, String messageKeyField, String explicitTopic, Function<Record, Integer> partitioner, PublishMetadataStrategy metadataStrategy) throws IOException {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
        int recordCount = 0;
        try {
            Record record;
            while ((record = recordSet.next()) != null) {
                Integer partition;
                String topic;
                byte[] messageKey;
                byte[] messageContent;
                List<Header> headers;
                ++recordCount;
                baos.reset();
                if (PublishStrategy.USE_WRAPPER.equals((Object)this.publishStrategy)) {
                    headers = this.toHeadersWrapper(record.getValue("headers"));
                    Object key = record.getValue("key");
                    Object value = record.getValue("value");
                    messageContent = this.toByteArray("value", value, writerFactory, flowFile);
                    messageKey = this.toByteArray("key", key, this.recordKeyWriterFactory, flowFile);
                    if (metadataStrategy == PublishMetadataStrategy.USE_RECORD_METADATA) {
                        Object metadataObject = record.getValue("metadata");
                        if (metadataObject instanceof Record) {
                            Record metadataRecord = (Record)metadataObject;
                            String recordTopic = metadataRecord.getAsString("topic");
                            topic = recordTopic == null ? explicitTopic : recordTopic;
                            try {
                                partition = metadataRecord.getAsInt("partition");
                            }
                            catch (Exception e) {
                                this.logger.warn("Encountered invalid partition for record in {}; will use configured partitioner for Record", new Object[]{flowFile});
                                partition = partitioner == null ? null : partitioner.apply(record);
                            }
                        } else {
                            topic = explicitTopic;
                            partition = partitioner == null ? null : partitioner.apply(record);
                        }
                    } else {
                        topic = explicitTopic;
                        partition = partitioner == null ? null : partitioner.apply(record);
                    }
                } else {
                    Map additionalAttributes;
                    try (RecordSetWriter writer = writerFactory.createWriter(this.logger, schema, (OutputStream)baos, flowFile);){
                        WriteResult writeResult = writer.write(record);
                        additionalAttributes = writeResult.getAttributes();
                        writer.flush();
                    }
                    headers = this.toHeaders(flowFile, additionalAttributes);
                    messageContent = baos.toByteArray();
                    messageKey = this.getMessageKey(flowFile, writerFactory, record.getValue(messageKeyField));
                    topic = explicitTopic;
                    partition = partitioner == null ? null : partitioner.apply(record);
                }
                this.publish(flowFile, headers, messageKey, messageContent, topic, this.tracker, partition);
                if (!this.tracker.isFailed(flowFile)) continue;
                return;
            }
            if (recordCount == 0) {
                this.tracker.trackEmpty(flowFile);
            }
        }
        catch (TokenTooLargeException ttle) {
            this.tracker.fail(flowFile, (Exception)((Object)ttle));
        }
        catch (SchemaNotFoundException | MalformedRecordException snfe) {
            throw new IOException(snfe);
        }
        catch (Exception e) {
            this.tracker.fail(flowFile, e);
            this.poison();
            throw e;
        }
    }

    private List<Header> toHeadersWrapper(Object fieldHeaders) {
        ArrayList<Header> headers = new ArrayList<Header>();
        if (fieldHeaders instanceof Record) {
            Record recordHeaders = (Record)fieldHeaders;
            for (String fieldName : recordHeaders.getRawFieldNames()) {
                String fieldValue = recordHeaders.getAsString(fieldName);
                headers.add((Header)new RecordHeader(fieldName, fieldValue.getBytes(StandardCharsets.UTF_8)));
            }
        }
        return headers;
    }

    private Record toWrapperRecord(Record record, List<Header> headers, String messageKeyField, String topic) {
        Record recordKey = (Record)record.getValue(messageKeyField);
        RecordSchema recordSchemaKey = recordKey == null ? null : recordKey.getSchema();
        RecordField fieldKey = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(recordSchemaKey));
        RecordField fieldValue = new RecordField("value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
        SimpleRecordSchema schemaWrapper = new SimpleRecordSchema(Arrays.asList(FIELD_METADATA, FIELD_HEADERS, fieldKey, fieldValue));
        HashMap<String, Object> valuesMetadata = new HashMap<String, Object>();
        valuesMetadata.put("topic", topic);
        valuesMetadata.put("timestamp", this.getTimestamp());
        MapRecord recordMetadata = new MapRecord(SCHEMA_METADATA, valuesMetadata);
        HashMap<String, String> valuesHeaders = new HashMap<String, String>();
        for (Header header : headers) {
            valuesHeaders.put(header.key(), new String(header.value(), this.headerCharacterSet));
        }
        HashMap<String, Object> valuesWrapper = new HashMap<String, Object>();
        valuesWrapper.put("metadata", recordMetadata);
        valuesWrapper.put("headers", valuesHeaders);
        valuesWrapper.put("key", record.getValue(messageKeyField));
        valuesWrapper.put("value", record);
        return new MapRecord((RecordSchema)schemaWrapper, valuesWrapper);
    }

    protected long getTimestamp() {
        return System.currentTimeMillis();
    }

    private List<Header> toHeaders(FlowFile flowFile, Map<String, ?> additionalAttributes) {
        if (this.attributeNameRegex == null) {
            return Collections.emptyList();
        }
        ArrayList<Header> headers = new ArrayList<Header>();
        for (Map.Entry entry : flowFile.getAttributes().entrySet()) {
            if (!this.attributeNameRegex.matcher((CharSequence)entry.getKey()).matches()) continue;
            headers.add((Header)new RecordHeader((String)entry.getKey(), ((String)entry.getValue()).getBytes(this.headerCharacterSet)));
        }
        for (Map.Entry<Object, Object> entry : additionalAttributes.entrySet()) {
            Object value;
            if (!this.attributeNameRegex.matcher((CharSequence)entry.getKey()).matches() || (value = entry.getValue()) == null) continue;
            String valueString = value.toString();
            headers.add((Header)new RecordHeader((String)entry.getKey(), valueString.getBytes(this.headerCharacterSet)));
        }
        return headers;
    }

    private byte[] toByteArray(String name, Object object, RecordSetWriterFactory writerFactory, FlowFile flowFile) throws IOException, SchemaNotFoundException, MalformedRecordException {
        if (object == null) {
            return null;
        }
        if (object instanceof Record) {
            if (writerFactory == null) {
                throw new MalformedRecordException("Record has a key that is itself a record, but the 'Record Key Writer' of the processor was not configured. If Records are expected to have a Record as the key, the 'Record Key Writer' property must be set.");
            }
            Record record = (Record)object;
            RecordSchema schema = record.getSchema();
            try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
                byte[] byArray;
                block17: {
                    RecordSetWriter writer = writerFactory.createWriter(this.logger, schema, (OutputStream)baos, flowFile);
                    try {
                        writer.write(record);
                        writer.flush();
                        byArray = baos.toByteArray();
                        if (writer == null) break block17;
                    }
                    catch (Throwable throwable) {
                        if (writer != null) {
                            try {
                                writer.close();
                            }
                            catch (Throwable throwable2) {
                                throwable.addSuppressed(throwable2);
                            }
                        }
                        throw throwable;
                    }
                    writer.close();
                }
                return byArray;
            }
        }
        if (object instanceof Byte[]) {
            Byte[] bytesUppercase = (Byte[])object;
            byte[] bytes = new byte[bytesUppercase.length];
            for (int i = 0; i < bytesUppercase.length; ++i) {
                bytes[i] = bytesUppercase[i];
            }
            return bytes;
        }
        if (object instanceof String) {
            String string = (String)object;
            return string.getBytes(StandardCharsets.UTF_8);
        }
        throw new MalformedRecordException(String.format("Couldn't convert %s record data to byte array.", name));
    }

    private byte[] getMessageKey(FlowFile flowFile, RecordSetWriterFactory writerFactory, Object keyValue) throws IOException, SchemaNotFoundException {
        byte[] messageKey;
        if (keyValue == null) {
            messageKey = null;
        } else if (keyValue instanceof byte[]) {
            messageKey = (byte[])keyValue;
        } else if (keyValue instanceof Byte[]) {
            Byte[] bytes = (Byte[])keyValue;
            byte[] bytesPrimitive = new byte[bytes.length];
            for (int i = 0; i < bytes.length; ++i) {
                bytesPrimitive[i] = bytes[i];
            }
            messageKey = bytesPrimitive;
        } else {
            if (keyValue instanceof Record) {
                Record keyRecord = (Record)keyValue;
                try (ByteArrayOutputStream os = new ByteArrayOutputStream(1024);){
                    try (RecordSetWriter writerKey = writerFactory.createWriter(this.logger, keyRecord.getSchema(), (OutputStream)os, flowFile);){
                        writerKey.write(keyRecord);
                        writerKey.flush();
                    }
                    messageKey = os.toByteArray();
                }
            }
            String keyString = keyValue.toString();
            messageKey = keyString.getBytes(StandardCharsets.UTF_8);
        }
        return messageKey;
    }

    private void addHeaders(FlowFile flowFile, Map<String, String> additionalAttributes, ProducerRecord<?, ?> record) {
        if (this.attributeNameRegex == null) {
            return;
        }
        Headers headers = record.headers();
        for (Map.Entry entry : flowFile.getAttributes().entrySet()) {
            if (!this.attributeNameRegex.matcher((CharSequence)entry.getKey()).matches()) continue;
            headers.add((String)entry.getKey(), ((String)entry.getValue()).getBytes(this.headerCharacterSet));
        }
        for (Map.Entry<Object, Object> entry : additionalAttributes.entrySet()) {
            if (!this.attributeNameRegex.matcher((CharSequence)entry.getKey()).matches()) continue;
            headers.add((String)entry.getKey(), ((String)entry.getValue()).getBytes(this.headerCharacterSet));
        }
    }

    protected void publish(FlowFile flowFile, byte[] messageKey, byte[] messageContent, String topic, InFlightMessageTracker tracker, Integer partition) {
        List<Header> headers = this.toHeaders(flowFile, Collections.emptyMap());
        this.publish(flowFile, headers, messageKey, messageContent, topic, tracker, partition);
    }

    protected void publish(final FlowFile flowFile, List<Header> headers, byte[] messageKey, byte[] messageContent, String topic, final InFlightMessageTracker tracker, Integer partition) {
        Integer moddedPartition = partition == null ? null : Integer.valueOf(Math.abs(partition) % this.producer.partitionsFor(topic).size());
        ProducerRecord record = new ProducerRecord(topic, moddedPartition, (Object)messageKey, (Object)messageContent, headers);
        this.producer.send(record, new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    tracker.incrementAcknowledgedCount(flowFile);
                } else {
                    tracker.fail(flowFile, exception);
                    PublisherLease.this.poison();
                }
            }
        });
        this.messagesSent.incrementAndGet();
        tracker.incrementSentCount(flowFile);
    }

    void ackConsumerOffsets(String topic, int partition, long offset, Integer leaderEpoch, String consumerGroupId) {
        TopicPartition topicPartition = new TopicPartition(topic, partition);
        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset + 1L, Optional.ofNullable(leaderEpoch), null);
        Map<TopicPartition, OffsetAndMetadata> offsetMap = Collections.singletonMap(topicPartition, offsetAndMetadata);
        this.logger.debug("Acknowledging Consumer Offsets for topic={}, partition={}, offset={}, consumerGroup={}, leaderEpoch={}", new Object[]{topic, partition, offset, consumerGroupId, leaderEpoch});
        this.producer.sendOffsetsToTransaction(offsetMap, consumerGroupId);
    }

    public PublishResult complete() {
        if (this.tracker == null) {
            if (this.messagesSent.get() == 0L) {
                return PublishResult.EMPTY;
            }
            this.rollback();
            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
        }
        try {
            this.producer.flush();
            if (this.activeTransaction) {
                this.producer.commitTransaction();
                this.activeTransaction = false;
            }
        }
        catch (FencedInstanceIdException | ProducerFencedException e) {
            throw e;
        }
        catch (Exception e) {
            this.poison();
            throw e;
        }
        try {
            this.tracker.awaitCompletion(this.maxAckWaitMillis);
            PublishResult e = this.tracker.createPublishResult();
            return e;
        }
        catch (InterruptedException e) {
            this.logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            Thread.currentThread().interrupt();
            PublishResult publishResult = this.tracker.failOutstanding(e);
            return publishResult;
        }
        catch (TimeoutException e) {
            this.logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
            PublishResult publishResult = this.tracker.failOutstanding(e);
            return publishResult;
        }
        finally {
            this.tracker = null;
        }
    }

    @Override
    public void close() {
        this.producer.close(this.maxAckWaitMillis, TimeUnit.MILLISECONDS);
        this.tracker = null;
    }

    public InFlightMessageTracker getTracker() {
        if (this.tracker == null) {
            this.tracker = new InFlightMessageTracker(this.logger);
        }
        return this.tracker;
    }

    public List<ConfigVerificationResult> verifyConfiguration(String topic) {
        ArrayList<ConfigVerificationResult> verificationResults = new ArrayList<ConfigVerificationResult>();
        try {
            List partitionInfos = this.producer.partitionsFor(topic);
            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Partitions").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Determined that there are " + partitionInfos.size() + " partitions for topic " + topic).build());
        }
        catch (Exception e) {
            this.logger.error("Failed to determine Partition Information for Topic {} in order to verify configuration", new Object[]{topic, e});
            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Determine Topic Partitions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Could not fetch Partition Information: " + e).build());
        }
        return verificationResults;
    }
}

