/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.parquet;

import com.google.common.collect.ImmutableSet;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.parquet.stream.NifiParquetOutputFile;
import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.io.OutputFile;

@Tags(value={"avro", "parquet", "convert"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="Sets the filename to the existing filename with the extension replaced by / added to by .parquet"), @WritesAttribute(attribute="record.count", description="Sets the number of records in the parquet file.")})
public class ConvertAvroToParquet
extends AbstractProcessor {
    public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
    private volatile List<PropertyDescriptor> parquetProps;
    static final Relationship SUCCESS = new Relationship.Builder().name("success").description("Parquet file that was converted successfully from Avro").build();
    static final Relationship FAILURE = new Relationship.Builder().name("failure").description("Avro content that could not be processed").build();
    static final Set<Relationship> RELATIONSHIPS = ImmutableSet.builder().add((Object)SUCCESS).add((Object)FAILURE).build();

    protected final void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(ParquetUtils.COMPRESSION_TYPE);
        props.add(ParquetUtils.ROW_GROUP_SIZE);
        props.add(ParquetUtils.PAGE_SIZE);
        props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
        props.add(ParquetUtils.MAX_PADDING_SIZE);
        props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
        props.add(ParquetUtils.ENABLE_VALIDATION);
        props.add(ParquetUtils.WRITER_VERSION);
        this.parquetProps = Collections.unmodifiableList(props);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.parquetProps;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            long startTime = System.currentTimeMillis();
            AtomicInteger totalRecordCount = new AtomicInteger(0);
            String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
            FlowFile putFlowFile = flowFile;
            putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
                try (BufferedInputStream in = new BufferedInputStream(rawIn);
                     DataFileStream dataFileReader = new DataFileStream((InputStream)in, (DatumReader)new GenericDatumReader());){
                    Schema avroSchema = dataFileReader.getSchema();
                    this.getLogger().debug(avroSchema.toString(true));
                    try (ParquetWriter writer = this.createParquetWriter(context, flowFile, rawOut, avroSchema);){
                        int recordCount = 0;
                        GenericRecord record = null;
                        while (dataFileReader.hasNext()) {
                            record = (GenericRecord)dataFileReader.next();
                            writer.write((Object)record);
                            ++recordCount;
                        }
                        totalRecordCount.set(recordCount);
                    }
                }
            });
            StringBuilder newFilename = new StringBuilder();
            int extensionIndex = fileName.lastIndexOf(".");
            if (extensionIndex != -1) {
                newFilename.append(fileName.substring(0, extensionIndex));
            } else {
                newFilename.append(fileName);
            }
            newFilename.append(".parquet");
            HashMap<String, String> outAttributes = new HashMap<String, String>();
            outAttributes.put(CoreAttributes.FILENAME.key(), newFilename.toString());
            outAttributes.put(RECORD_COUNT_ATTRIBUTE, Integer.toString(totalRecordCount.get()));
            putFlowFile = session.putAllAttributes(putFlowFile, outAttributes);
            session.transfer(putFlowFile, SUCCESS);
            session.getProvenanceReporter().modifyContent(putFlowFile, "Converted " + totalRecordCount.get() + " records", System.currentTimeMillis() - startTime);
        }
        catch (ProcessException pe) {
            this.getLogger().error("Failed to convert {} from Avro to Parquet", new Object[]{flowFile, pe});
            session.transfer(flowFile, FAILURE);
        }
    }

    private ParquetWriter createParquetWriter(ProcessContext context, FlowFile flowFile, OutputStream out, Schema schema) throws IOException {
        NifiParquetOutputFile nifiParquetOutputFile = new NifiParquetOutputFile(out);
        AvroParquetWriter.Builder parquetWriter = AvroParquetWriter.builder((OutputFile)nifiParquetOutputFile).withSchema(schema);
        ParquetConfig parquetConfig = ParquetUtils.createParquetConfig((PropertyContext)context, flowFile.getAttributes());
        parquetConfig.setAvroReadCompatibility(true);
        parquetConfig.setAvroAddListElementRecords(false);
        parquetConfig.setAvroWriteOldListStructure(false);
        Configuration conf = new Configuration();
        ParquetUtils.applyCommonConfig(parquetWriter, conf, parquetConfig);
        return parquetWriter.build();
    }
}

