/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.FlowFileFilters;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.MergeContent;
import org.apache.nifi.processors.standard.PartitionRecord;
import org.apache.nifi.processors.standard.SplitRecord;
import org.apache.nifi.processors.standard.merge.AttributeStrategyUtil;
import org.apache.nifi.processors.standard.merge.RecordBinManager;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;

@SideEffectFree
@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"merge", "record", "content", "correlation", "stream", "event"})
@CapabilityDescription(value="This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information.")
@ReadsAttributes(value={@ReadsAttribute(attribute="fragment.identifier", description="Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together."), @ReadsAttribute(attribute="fragment.count", description="Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle.")})
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="The merged FlowFile will have a 'record.count' attribute indicating the number of records that were written to the FlowFile."), @WritesAttribute(attribute="mime.type", description="The MIME Type indicated by the Record Writer"), @WritesAttribute(attribute="merge.count", description="The number of FlowFiles that were merged into this bundle"), @WritesAttribute(attribute="merge.bin.age", description="The age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output"), @WritesAttribute(attribute="merge.uuid", description="UUID of the merged FlowFile that will be added to the original FlowFiles attributes"), @WritesAttribute(attribute="<Attributes from Record Writer>", description="Any Attribute that the configured Record Writer returns will be added to the FlowFile.")})
@SeeAlso(value={MergeContent.class, SplitRecord.class, PartitionRecord.class})
public class MergeRecord
extends AbstractSessionFactoryProcessor {
    public static final String FRAGMENT_ID_ATTRIBUTE = FragmentAttributes.FRAGMENT_ID.key();
    public static final String FRAGMENT_INDEX_ATTRIBUTE = FragmentAttributes.FRAGMENT_INDEX.key();
    public static final String FRAGMENT_COUNT_ATTRIBUTE = FragmentAttributes.FRAGMENT_COUNT.key();
    public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
    public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
    public static final String MERGE_UUID_ATTRIBUTE = "merge.uuid";
    public static final AllowableValue MERGE_STRATEGY_BIN_PACK = new AllowableValue("Bin-Packing Algorithm", "Bin-Packing Algorithm", "Generates 'bins' of FlowFiles and fills each bin as full as possible. FlowFiles are placed into a bin based on their size and optionally their attributes (if the <Correlation Attribute> property is set)");
    public static final AllowableValue MERGE_STRATEGY_DEFRAGMENT = new AllowableValue("Defragment", "Defragment", "Combines fragments that are associated by attributes back into a single cohesive FlowFile. If using this strategy, all FlowFiles must have the attributes <fragment.identifier> and <fragment.count>. All FlowFiles with the same value for \"fragment.identifier\" will be grouped together. All FlowFiles in this group must have the same value for the \"fragment.count\" attribute. The ordering of the Records that are output is not guaranteed.");
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading incoming data").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing out the records").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final PropertyDescriptor MERGE_STRATEGY = new PropertyDescriptor.Builder().name("merge-strategy").displayName("Merge Strategy").description("Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles").required(true).allowableValues(new AllowableValue[]{MERGE_STRATEGY_BIN_PACK, MERGE_STRATEGY_DEFRAGMENT}).defaultValue(MERGE_STRATEGY_BIN_PACK.getValue()).build();
    public static final PropertyDescriptor CORRELATION_ATTRIBUTE_NAME = new PropertyDescriptor.Builder().name("correlation-attribute-name").displayName("Correlation Attribute Name").description("If specified, two FlowFiles will be binned together only if they have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.").required(false).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR).defaultValue(null).build();
    public static final PropertyDescriptor MIN_SIZE = new PropertyDescriptor.Builder().name("min-bin-size").displayName("Minimum Bin Size").description("The minimum size of for the bin").required(true).defaultValue("0 B").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MAX_SIZE = new PropertyDescriptor.Builder().name("max-bin-size").displayName("Maximum Bin Size").description("The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.").required(false).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MIN_RECORDS = new PropertyDescriptor.Builder().name("min-records").displayName("Minimum Number of Records").description("The minimum number of records to include in a bin").required(true).defaultValue("1").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor MAX_RECORDS = new PropertyDescriptor.Builder().name("max-records").displayName("Maximum Number of Records").description("The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of records in the last input FlowFile.").required(false).defaultValue("1000").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor MAX_BIN_COUNT = new PropertyDescriptor.Builder().name("max.bin.count").displayName("Maximum Number of Bins").description("Specifies the maximum number of bins that can be held in memory at any one time. This number should not be smaller than the maximum number of concurrent threads for this Processor, or the bins that are created will often consist only of a single incoming FlowFile.").defaultValue("10").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor MAX_BIN_AGE = new PropertyDescriptor.Builder().name("max-bin-age").displayName("Max Bin Age").description("The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final Relationship REL_MERGED = new Relationship.Builder().name("merged").description("The FlowFile containing the merged records").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The FlowFiles that were used to create the bundle").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure").build();
    private final AtomicReference<RecordBinManager> binManager = new AtomicReference();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(RECORD_READER);
        properties.add(RECORD_WRITER);
        properties.add(MERGE_STRATEGY);
        properties.add(CORRELATION_ATTRIBUTE_NAME);
        properties.add(AttributeStrategyUtil.ATTRIBUTE_STRATEGY);
        properties.add(MIN_RECORDS);
        properties.add(MAX_RECORDS);
        properties.add(MIN_SIZE);
        properties.add(MAX_SIZE);
        properties.add(MAX_BIN_AGE);
        properties.add(MAX_BIN_COUNT);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_FAILURE);
        relationships.add(REL_MERGED);
        return relationships;
    }

    @OnStopped
    public final void resetState() {
        RecordBinManager manager = this.binManager.get();
        if (manager != null) {
            manager.purge();
        }
        this.binManager.set(null);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        Integer minRecords = validationContext.getProperty(MIN_RECORDS).evaluateAttributeExpressions().asInteger();
        Integer maxRecords = validationContext.getProperty(MAX_RECORDS).evaluateAttributeExpressions().asInteger();
        if (minRecords != null && maxRecords != null && maxRecords < minRecords) {
            results.add(new ValidationResult.Builder().subject("Max Records").input(String.valueOf(maxRecords)).valid(false).explanation("<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property").build());
        }
        if (minRecords != null && minRecords <= 0) {
            results.add(new ValidationResult.Builder().subject("Min Records").input(String.valueOf(minRecords)).valid(false).explanation("<Minimum Number of Records> property cannot be negative or zero").build());
        }
        if (maxRecords != null && maxRecords <= 0) {
            results.add(new ValidationResult.Builder().subject("Max Records").input(String.valueOf(maxRecords)).valid(false).explanation("<Maximum Number of Records> property cannot be negative or zero").build());
        }
        Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
        Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
        if (minSize != null && maxSize != null && maxSize < minSize) {
            results.add(new ValidationResult.Builder().subject("Max Size").input(validationContext.getProperty(MAX_SIZE).getValue()).valid(false).explanation("<Maximum Bin Size> property cannot be smaller than <Minimum Bin Size> property").build());
        }
        return results;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session;
        List flowFiles;
        RecordBinManager manager = this.binManager.get();
        while (manager == null) {
            manager = new RecordBinManager(context, sessionFactory, this.getLogger());
            manager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
            boolean updated = this.binManager.compareAndSet(null, manager);
            if (updated) continue;
            manager = this.binManager.get();
        }
        boolean flowFilePolled = false;
        while (this.isScheduled() && !(flowFiles = (session = sessionFactory.createSession()).get(FlowFileFilters.newSizeBasedFilter((double)250.0, (DataUnit)DataUnit.KB, (int)250))).isEmpty()) {
            flowFilePolled = true;
            if (this.getLogger().isDebugEnabled()) {
                List ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
                this.getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[]{ids.size(), ids});
            }
            String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
            boolean block = MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy) ? true : context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet();
            try {
                for (FlowFile flowFile : flowFiles) {
                    try {
                        this.binFlowFile(context, flowFile, session, manager, block);
                    }
                    catch (Exception e) {
                        this.getLogger().error("Failed to bin {} due to {}", new Object[]{flowFile, e, e});
                        session.transfer(flowFile, REL_FAILURE);
                    }
                }
            }
            finally {
                session.commitAsync();
            }
            try {
                manager.completeExpiredBins();
            }
            catch (Exception e) {
                this.getLogger().error("Failed to merge FlowFiles to create new bin due to {}", new Object[]{e, e});
            }
        }
        if (this.isScheduled()) {
            try {
                manager.completeExpiredBins();
            }
            catch (Exception e) {
                this.getLogger().error("Failed to merge FlowFiles to create new bin due to {}", new Object[]{e, e});
            }
            try {
                if (flowFilePolled) {
                    manager.completeFullBins();
                } else {
                    manager.completeFullEnoughBins();
                    this.getLogger().debug("No more FlowFiles to bin; will yield");
                    context.yield();
                }
            }
            catch (Exception e) {
                this.getLogger().error("Failed to merge FlowFiles to create new bin due to {}", new Object[]{e, e});
            }
        }
    }

    private void binFlowFile(ProcessContext context, FlowFile flowFile, ProcessSession session, RecordBinManager binManager, boolean block) {
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        try (InputStream in = session.read(flowFile);
             RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
            RecordSchema schema = reader.getSchema();
            String groupId = this.getGroupId(context, flowFile, schema, session);
            this.getLogger().debug("Got Group ID {} for {}", new Object[]{groupId, flowFile});
            binManager.add(groupId, flowFile, reader, session, block);
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            throw new ProcessException(e);
        }
    }

    protected String getGroupId(ProcessContext context, FlowFile flowFile, RecordSchema schema, ProcessSession session) {
        String correlationAttr;
        String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
            return flowFile.getAttribute(FRAGMENT_ID_ATTRIBUTE);
        }
        Optional optionalText = schema.getSchemaText();
        String schemaText = optionalText.orElseGet(() -> AvroTypeUtil.extractAvroSchema((RecordSchema)schema).toString());
        String correlationshipAttributeName = context.getProperty(CORRELATION_ATTRIBUTE_NAME).getValue();
        Object groupId = correlationshipAttributeName != null ? ((correlationAttr = flowFile.getAttribute(correlationshipAttributeName)) == null ? schemaText : schemaText + correlationAttr) : schemaText;
        return groupId;
    }

    int getBinCount() {
        return this.binManager.get().getBinCount();
    }
}

