/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.parquet;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parquet.stream.NifiParquetInputFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.io.InputFile;

@Tags(value={"parquet", "split", "partition", "break apart", "efficient processing", "load balance", "cluster"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="The processor generates N flow files from the input, and adds attributes with the offsets required to read the group of rows in the FlowFile's content. Can be used to increase the overall efficiency of processing extremely large Parquet files.")
@WritesAttributes(value={@WritesAttribute(attribute="record.offset", description="Sets the index of first record of the parquet file."), @WritesAttribute(attribute="record.count", description="Sets the number of records in the parquet file.")})
@ReadsAttributes(value={@ReadsAttribute(attribute="record.offset", description="Gets the index of first record in the input."), @ReadsAttribute(attribute="record.count", description="Gets the number of records in the input."), @ReadsAttribute(attribute="parquet.file.range.startOffset", description="Gets the start offset of the selected row group in the parquet file."), @ReadsAttribute(attribute="parquet.file.range.endOffset", description="Gets the end offset of the selected row group in the parquet file.")})
@SideEffectFree
public class CalculateParquetOffsets
extends AbstractProcessor {
    static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new PropertyDescriptor.Builder().name("Records Per Split").description("Specifies how many records should be covered in each FlowFile").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new PropertyDescriptor.Builder().name("Zero Content Output").description("Whether to do, or do not copy the content of input FlowFile.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles, with special attributes that represent a chunk of the input file.").build();
    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Arrays.asList(PROP_RECORDS_PER_SPLIT, PROP_ZERO_CONTENT_OUTPUT);
    static final Set<Relationship> RELATIONSHIPS = new HashSet<Relationship>(Collections.singletonList(REL_SUCCESS));

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile inputFlowFile = session.get();
        if (inputFlowFile == null) {
            return;
        }
        long partitionSize = context.getProperty(PROP_RECORDS_PER_SPLIT).asLong();
        boolean zeroContentOutput = context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
        long recordOffset = Optional.ofNullable(inputFlowFile.getAttribute("record.offset")).map(Long::valueOf).orElse(0L);
        long recordCount = Optional.ofNullable(inputFlowFile.getAttribute("record.count")).map(Long::valueOf).orElseGet(() -> this.getRecordCount(session, inputFlowFile) - recordOffset);
        List<FlowFile> partitions = this.getPartitions(session, inputFlowFile, partitionSize, recordCount, recordOffset, zeroContentOutput);
        session.transfer(partitions, REL_SUCCESS);
        session.adjustCounter("Records Split", recordCount, false);
        session.adjustCounter("Partitions Created", (long)partitions.size(), false);
        if (zeroContentOutput) {
            session.remove(inputFlowFile);
        }
    }

    /*
     * Enabled aggressive exception aggregation
     */
    private long getRecordCount(ProcessSession session, FlowFile flowFile) {
        long fileStartOffset = Optional.ofNullable(flowFile.getAttribute("parquet.file.range.startOffset")).map(Long::parseLong).orElse(0L);
        long fileEndOffset = Optional.ofNullable(flowFile.getAttribute("parquet.file.range.endOffset")).map(Long::parseLong).orElse(Long.MAX_VALUE);
        ParquetReadOptions readOptions = ParquetReadOptions.builder().withRange(fileStartOffset, fileEndOffset).build();
        try (InputStream in = session.read(flowFile);){
            long l;
            try (ParquetFileReader reader = new ParquetFileReader((InputFile)new NifiParquetInputFile(in, flowFile.getSize()), readOptions);){
                l = reader.getRecordCount();
            }
            return l;
        }
        catch (IOException e) {
            throw new ProcessException((Throwable)e);
        }
    }

    private List<FlowFile> getPartitions(ProcessSession session, FlowFile inputFlowFile, final long partitionSize, final long recordCount, final long recordOffset, boolean zeroContentOutput) {
        long numberOfPartitions = recordCount / partitionSize + (long)(recordCount % partitionSize > 0L ? 1 : 0);
        ArrayList<FlowFile> results = new ArrayList<FlowFile>((int)Math.min(Integer.MAX_VALUE, numberOfPartitions));
        for (long currentPartition = 0L; currentPartition < numberOfPartitions; ++currentPartition) {
            final long addedOffset = currentPartition * partitionSize;
            FlowFile outputFlowFile = zeroContentOutput ? session.create(inputFlowFile) : (currentPartition == 0L ? inputFlowFile : session.clone(inputFlowFile));
            results.add(session.putAllAttributes(outputFlowFile, (Map)new HashMap<String, String>(){
                {
                    this.put("record.offset", Long.toString(recordOffset + addedOffset));
                    this.put("record.count", Long.toString(Math.min(partitionSize, recordCount - addedOffset)));
                }
            }));
        }
        return results;
    }
}

