/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.kafka.pubsub;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
import org.apache.nifi.kafka.shared.property.FailureStrategy;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0;
import org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0;
import org.apache.nifi.processors.kafka.pubsub.Partitioners;
import org.apache.nifi.processors.kafka.pubsub.PublishFailureStrategy;
import org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0;
import org.apache.nifi.processors.kafka.pubsub.PublishResult;
import org.apache.nifi.processors.kafka.pubsub.PublisherLease;
import org.apache.nifi.processors.kafka.pubsub.PublisherPool;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;

@Tags(value={"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.0"})
@CapabilityDescription(value="Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.0 Producer API. The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. The complementary NiFi processor for fetching messages is ConsumeKafkaRecord_2_0.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@DynamicProperty(name="The name of a Kafka configuration property.", value="The value of a given Kafka configuration property.", description="These properties will be added on the Kafka configuration after loading any provided configuration properties. In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged. For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", expressionLanguageScope=ExpressionLanguageScope.VARIABLE_REGISTRY)
@WritesAttribute(attribute="msg.count", description="The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to FlowFiles that are routed to success.")
@SeeAlso(value={PublishKafka_2_0.class, ConsumeKafka_2_0.class, ConsumeKafkaRecord_2_0.class})
public class PublishKafkaRecord_2_0
extends AbstractProcessor
implements KafkaPublishComponent {
    protected static final String MSG_COUNT = "msg.count";
    static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
    static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
    static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully sending the content to a Kafka node, without waiting for any acknowledgment from the node at all. This provides the best performance but may result in data loss.");
    static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(), Partitioners.RoundRobinPartitioner.class.getSimpleName(), "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary.");
    static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", "DefaultPartitioner", "Messages will be assigned to random partitions.");
    static final AllowableValue RECORD_PATH_PARTITIONING = new AllowableValue(Partitioners.RecordPathPartitioner.class.getName(), "RecordPath Partitioner", "Interprets the <Partition> property as a RecordPath that will be evaluated against each Record to determine which partition the Record will go to. All Records that have the same value for the given RecordPath will go to the same Partition.");
    static final AllowableValue EXPRESSION_LANGUAGE_PARTITIONING = new AllowableValue(Partitioners.ExpressionLanguagePartitioner.class.getName(), "Expression Language Partitioner", "Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, so all Records in a given FlowFile will go to the same partition.");
    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder().name("topic").displayName("Topic Name").description("The name of the Kafka Topic to publish to.").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("The Record Reader to use for incoming FlowFiles").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before sending to Kafka").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).build();
    static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder().name("message-key-field").displayName("Message Key Field").description("The name of a field in the Input Records that should be used as the Key for the Kafka message.").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).build();
    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder().name("acks").displayName("Delivery Guarantee").description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED}).defaultValue(DELIVERY_REPLICATED.getValue()).build();
    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder().name("max.block.ms").displayName("Max Metadata Wait Time").description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the entire 'send' call. Corresponds to Kafka's 'max.block.ms' property").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("5 sec").build();
    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder().name("ack.wait.time").displayName("Acknowledgment Wait Time").description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).defaultValue("5 secs").build();
    static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder().name("max.request.size").displayName("Max Request Size").description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).").required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder().name("partitioner.class").displayName("Partitioner class").description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.").allowableValues(new AllowableValue[]{ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, RECORD_PATH_PARTITIONING, EXPRESSION_LANGUAGE_PARTITIONING}).defaultValue(RANDOM_PARTITIONING.getValue()).required(false).build();
    static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder().name("partition").displayName("Partition").description("Specifies which Partition Records will go to. How this value is interpreted is dictated by the <Partitioner class> property.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder().name("compression.type").displayName("Compression Type").description("This parameter allows you to specify the compression codec for all data generated by this producer.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).allowableValues(new String[]{"none", "gzip", "snappy", "lz4"}).defaultValue("none").build();
    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder().name("attribute-name-regex").displayName("Attributes to Send as Headers (Regex)").description("A Regular Expression that is matched against all FlowFile attribute names. Any attribute whose name matches the regex will be added to the Kafka messages as a Header. If not specified, no FlowFile attributes will be added as headers.").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).required(false).build();
    static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder().name("use-transactions").displayName("Use Transactions").description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(true).build();
    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder().name("transactional-id-prefix").displayName("Transactional Id Prefix").description("When Use Transaction is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).required(false).build();
    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder().name("message-header-encoding").displayName("Message Header Encoding").description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, this property indicates the Character Encoding to use for serializing the headers.").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).defaultValue("UTF-8").required(false).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles for which all content was sent to Kafka.").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship").build();
    private static final List<PropertyDescriptor> PROPERTIES;
    private static final Set<Relationship> RELATIONSHIPS;
    private volatile PublisherPool publisherPool = null;
    private final RecordPathCache recordPathCache = new RecordPathCache(25);

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.").name(propertyDescriptorName).addValidator((Validator)new DynamicPropertyValidator(ProducerConfig.class)).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        String rawRecordPath;
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(new KafkaClientCustomValidationFunction().apply(validationContext));
        boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
        if (useTransactions) {
            String deliveryGuarantee = validationContext.getProperty(DELIVERY_GUARANTEE).getValue();
            if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) {
                results.add(new ValidationResult.Builder().subject("Delivery Guarantee").valid(false).explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" Either change the <Use Transactions> property or the <Delivery Guarantee> property.").build());
            }
        }
        String partitionClass = validationContext.getProperty(PARTITION_CLASS).getValue();
        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
            ValidationResult result;
            String rawRecordPath2 = validationContext.getProperty(PARTITION).getValue();
            if (rawRecordPath2 == null) {
                results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation("The <Partition> property must be specified if using the RecordPath Partitioning class").build());
            } else if (!validationContext.isExpressionLanguagePresent(rawRecordPath2) && (result = new RecordPathValidator().validate(PARTITION.getDisplayName(), rawRecordPath2, validationContext)) != null) {
                results.add(result);
            }
        } else if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass) && (rawRecordPath = validationContext.getProperty(PARTITION).getValue()) == null) {
            results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation("The <Partition> property must be specified if using the Expression Language Partitioning class").build());
        }
        return results;
    }

    private synchronized PublisherPool getPublisherPool(ProcessContext context) {
        PublisherPool pool = this.publisherPool;
        if (pool != null) {
            return pool;
        }
        this.publisherPool = this.createPublisherPool(context);
        return this.publisherPool;
    }

    protected PublisherPool createPublisherPool(ProcessContext context) {
        int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
        long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
        String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
        Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
        boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
        String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
        TransactionIdSupplier transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
        String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
        Charset charset = Charset.forName(charsetName);
        StandardKafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ProducerConfig.class);
        Map kafkaProperties = propertyProvider.getProperties((PropertyContext)context);
        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
        kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
        return new PublisherPool(kafkaProperties, this.getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, (Supplier<String>)transactionalIdSupplier, attributeNamePattern, charset);
    }

    @OnStopped
    public void closePool() {
        if (this.publisherPool != null) {
            this.publisherPool.close();
        }
        this.publisherPool = null;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        List flowFiles = session.get(FlowFileFilters.newSizeBasedFilter((double)1.0, (DataUnit)DataUnit.MB, (int)500));
        if (flowFiles.isEmpty()) {
            return;
        }
        PublisherPool pool = this.getPublisherPool(context);
        if (pool == null) {
            context.yield();
            return;
        }
        String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
        String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
        PublishFailureStrategy failureStrategy = this.getFailureStrategy(context);
        long startTime = System.nanoTime();
        PublisherLease lease = pool.obtainPublisher();
        if (useTransactions) {
            lease.beginTransaction();
        }
        Iterator itr = flowFiles.iterator();
        while (itr.hasNext()) {
            FlowFile flowFile = (FlowFile)itr.next();
            if (!this.isScheduled()) {
                if (useTransactions) {
                    session.rollback();
                    lease.rollback();
                    if (lease == null) return;
                    lease.close();
                    return;
                }
                session.transfer(flowFile);
                itr.remove();
                continue;
            }
            String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
            String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
            Function<Record, Integer> partitioner = this.getPartitioner(context, flowFile);
            try {
                session.read(flowFile, in -> {
                    try {
                        RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());
                        RecordSet recordSet = reader.createRecordSet();
                        RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
                        lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
                    }
                    catch (SchemaNotFoundException | MalformedRecordException e) {
                        throw new ProcessException(e);
                    }
                });
            }
            catch (Exception e) {
                lease.fail(flowFile, e);
            }
        }
        PublishResult publishResult = lease.complete();
        if (publishResult.isFailure()) {
            this.getLogger().info("Failed to send FlowFile to kafka; transferring to specified failure strategy");
            failureStrategy.routeFlowFiles(session, flowFiles);
            if (lease == null) return;
            lease.close();
            return;
        }
        try {
            long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
            Iterator iterator = flowFiles.iterator();
            while (iterator.hasNext()) {
                FlowFile success = (FlowFile)iterator.next();
                String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
                int msgCount = publishResult.getSuccessfulMessageCount(success);
                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
                session.adjustCounter("Messages Sent", (long)msgCount, true);
                String transitUri = StandardTransitUriProvider.getTransitUri((String)securityProtocol, (String)bootstrapServers, (String)topic);
                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
                session.transfer(success, REL_SUCCESS);
            }
            return;
        }
        catch (AuthorizationException | OutOfOrderSequenceException | ProducerFencedException e) {
            lease.poison();
            this.getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to specified failure strategy");
            failureStrategy.routeFlowFiles(session, flowFiles);
            context.yield();
            return;
        }
    }

    private Function<Record, Integer> getPartitioner(ProcessContext context, FlowFile flowFile) {
        String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
        if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
            String recordPath = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
            RecordPath compiled = this.recordPathCache.getCompiled(recordPath);
            return record -> this.evaluateRecordPath(compiled, (Record)record);
        }
        if (EXPRESSION_LANGUAGE_PARTITIONING.getValue().equals(partitionClass)) {
            String partition = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
            int hash = Objects.hashCode(partition);
            return record -> hash;
        }
        return null;
    }

    private Integer evaluateRecordPath(RecordPath recordPath, Record record) {
        RecordPathResult result = recordPath.evaluate(record);
        LongAccumulator accumulator = new LongAccumulator(Long::sum, 0L);
        result.getSelectedFields().forEach(fieldValue -> {
            Object value = fieldValue.getValue();
            long hash = Objects.hashCode(value);
            accumulator.accumulate(hash);
        });
        return accumulator.intValue();
    }

    private PublishFailureStrategy getFailureStrategy(ProcessContext context) {
        String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
        if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {
            return (session, flowFiles) -> session.rollback();
        }
        return (session, flowFiles) -> session.transfer((Collection)flowFiles, REL_FAILURE);
    }

    static {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(BOOTSTRAP_SERVERS);
        properties.add(TOPIC);
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.add(USE_TRANSACTIONS);
        properties.add(FAILURE_STRATEGY);
        properties.add(TRANSACTIONAL_ID_PREFIX);
        properties.add(DELIVERY_GUARANTEE);
        properties.add(ATTRIBUTE_NAME_REGEX);
        properties.add(MESSAGE_HEADER_ENCODING);
        properties.add(SECURITY_PROTOCOL);
        properties.add(SASL_MECHANISM);
        properties.add(KERBEROS_CREDENTIALS_SERVICE);
        properties.add(KERBEROS_SERVICE_NAME);
        properties.add(KERBEROS_PRINCIPAL);
        properties.add(KERBEROS_KEYTAB);
        properties.add(SASL_USERNAME);
        properties.add(SASL_PASSWORD);
        properties.add(TOKEN_AUTHENTICATION);
        properties.add(SSL_CONTEXT_SERVICE);
        properties.add(MESSAGE_KEY_FIELD);
        properties.add(MAX_REQUEST_SIZE);
        properties.add(ACK_WAIT_TIME);
        properties.add(METADATA_WAIT_TIME);
        properties.add(PARTITION_CLASS);
        properties.add(PARTITION);
        properties.add(COMPRESSION_CODEC);
        PROPERTIES = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
    }
}

