/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.parquet;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parquet.stream.NifiParquetInputFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.InputFile;

@Tags(value={"parquet", "split", "partition", "break apart", "efficient processing", "load balance", "cluster"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="The processor generates one FlowFile from each Row Group of the input, and adds attributes with the offsets required to read the group of rows in the FlowFile's content. Can be used to increase the overall efficiency of processing extremely large Parquet files.")
@WritesAttributes(value={@WritesAttribute(attribute="parquet.file.range.startOffset", description="Sets the start offset of the selected row group in the parquet file."), @WritesAttribute(attribute="parquet.file.range.endOffset", description="Sets the end offset of the selected row group in the parquet file."), @WritesAttribute(attribute="record.count", description="Sets the count of records in the selected row group.")})
@SideEffectFree
public class CalculateParquetRowGroupOffsets
extends AbstractProcessor {
    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new PropertyDescriptor.Builder().name("Zero Content Output").description("Whether to do, or do not copy the content of input FlowFile.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles, with special attributes that represent a chunk of the input file.").build();
    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.singletonList(PROP_ZERO_CONTENT_OUTPUT);
    static final Set<Relationship> RELATIONSHIPS = new HashSet<Relationship>(Collections.singletonList(REL_SUCCESS));

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile original = session.get();
        if (original == null) {
            return;
        }
        boolean zeroContentOutput = context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
        ParquetMetadata parquetMetadata = this.getParquetMetadata(session, original);
        List<FlowFile> partitions = this.getPartitions(session, original, parquetMetadata.getBlocks(), zeroContentOutput);
        session.transfer(partitions, REL_SUCCESS);
        session.adjustCounter("Partitions Created", (long)partitions.size(), false);
        if (zeroContentOutput) {
            session.remove(original);
        }
    }

    /*
     * Enabled aggressive exception aggregation
     */
    private ParquetMetadata getParquetMetadata(ProcessSession session, FlowFile flowFile) {
        ParquetReadOptions readOptions = ParquetReadOptions.builder().build();
        try (InputStream in = session.read(flowFile);){
            ParquetMetadata parquetMetadata;
            try (ParquetFileReader reader = new ParquetFileReader((InputFile)new NifiParquetInputFile(in, flowFile.getSize()), readOptions);){
                parquetMetadata = reader.getFooter();
            }
            return parquetMetadata;
        }
        catch (IOException e) {
            throw new ProcessException((Throwable)e);
        }
    }

    private List<FlowFile> getPartitions(ProcessSession session, FlowFile flowFile, List<BlockMetaData> blocks, boolean zeroContentOutput) {
        ArrayList<FlowFile> results = new ArrayList<FlowFile>(blocks.size());
        for (int currentPartition = 0; currentPartition < blocks.size(); ++currentPartition) {
            final BlockMetaData currentBlock = blocks.get(currentPartition);
            final long currentBlockStartOffset = currentBlock.getStartingPos();
            final long currentBlockEndOffset = currentBlockStartOffset + currentBlock.getCompressedSize();
            FlowFile outputFlowFile = zeroContentOutput ? session.create(flowFile) : (currentPartition == 0 ? flowFile : session.clone(flowFile));
            results.add(session.putAllAttributes(outputFlowFile, (Map)new HashMap<String, String>(){
                {
                    this.put("parquet.file.range.startOffset", String.valueOf(currentBlockStartOffset));
                    this.put("parquet.file.range.endOffset", String.valueOf(currentBlockEndOffset));
                    this.put("record.count", String.valueOf(currentBlock.getRowCount()));
                }
            }));
        }
        return results;
    }
}

