/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.avro;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

@SideEffectFree
@SupportsBatching
@Tags(value={"avro", "split"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.")
@WritesAttributes(value={@WritesAttribute(attribute="fragment.identifier", description="All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), @WritesAttribute(attribute="fragment.index", description="A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), @WritesAttribute(attribute="fragment.count", description="The number of split FlowFiles generated from the parent FlowFile"), @WritesAttribute(attribute="segment.original.filename ", description="The filename of the parent FlowFile")})
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class SplitAvro
extends AbstractProcessor {
    public static final String RECORD_SPLIT_VALUE = "Record";
    public static final AllowableValue RECORD_SPLIT = new AllowableValue("Record", "Record", "Split at Record boundaries");
    public static final PropertyDescriptor SPLIT_STRATEGY = new PropertyDescriptor.Builder().name("Split Strategy").description("The strategy for splitting the incoming datafile. The Record strategy will read the incoming datafile by de-serializing each record.").required(true).allowableValues(new AllowableValue[]{RECORD_SPLIT}).defaultValue(RECORD_SPLIT.getValue()).build();
    public static final PropertyDescriptor OUTPUT_SIZE = new PropertyDescriptor.Builder().name("Output Size").description("The number of Avro records to include per split file. In cases where the incoming file has less records than the Output Size, or when the total number of records does not divide evenly by the Output Size, it is possible to get a split file with less records.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).required(true).defaultValue("1").build();
    public static final String DATAFILE_OUTPUT_VALUE = "Datafile";
    public static final String BARE_RECORD_OUTPUT_VALUE = "Bare Record";
    public static final AllowableValue DATAFILE_OUTPUT = new AllowableValue("Datafile", "Datafile", "Avro's object container file format");
    public static final AllowableValue BARE_RECORD_OUTPUT = new AllowableValue("Bare Record", "Bare Record", "Bare Avro records");
    public static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder().name("Output Strategy").description("Determines the format of the output. Either Avro Datafile, or bare record. Bare record output is only intended for use with systems that already require it, and shouldn't be needed for normal use.").required(true).allowableValues(new AllowableValue[]{DATAFILE_OUTPUT, BARE_RECORD_OUTPUT}).defaultValue(DATAFILE_OUTPUT.getValue()).build();
    public static final PropertyDescriptor TRANSFER_METADATA = new PropertyDescriptor.Builder().name("Transfer Metadata").description("Whether or not to transfer metadata from the parent datafile to the children. If the Output Strategy is Bare Record, then the metadata will be stored as FlowFile attributes, otherwise it will be in the Datafile header.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original FlowFile that was split. If the FlowFile fails processing, nothing will be sent to this relationship").build();
    public static final Relationship REL_SPLIT = new Relationship.Builder().name("split").description("All new files split from the original FlowFile will be routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid Avro), it will be routed to this relationship").build();
    static final Set<String> RESERVED_METADATA;
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(SPLIT_STRATEGY);
        properties.add(OUTPUT_SIZE);
        properties.add(OUTPUT_STRATEGY);
        properties.add(TRANSFER_METADATA);
        this.properties = Collections.unmodifiableList(properties);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_ORIGINAL);
        relationships.add(REL_SPLIT);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String splitStrategy;
        String outputStrategy;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        int splitSize = context.getProperty(OUTPUT_SIZE).asInteger();
        boolean transferMetadata = context.getProperty(TRANSFER_METADATA).asBoolean();
        SplitWriter splitWriter = switch (outputStrategy = context.getProperty(OUTPUT_STRATEGY).getValue()) {
            case DATAFILE_OUTPUT_VALUE -> new DatafileSplitWriter(transferMetadata);
            case BARE_RECORD_OUTPUT_VALUE -> new BareRecordSplitWriter();
            default -> throw new AssertionError();
        };
        RecordSplitter splitter = switch (splitStrategy = context.getProperty(SPLIT_STRATEGY).getValue()) {
            case RECORD_SPLIT_VALUE -> new RecordSplitter(splitSize, transferMetadata);
            default -> throw new AssertionError();
        };
        try {
            List<FlowFile> splits = splitter.split(session, flowFile, splitWriter);
            String fragmentIdentifier = UUID.randomUUID().toString();
            IntStream.range(0, splits.size()).forEach(i -> {
                FlowFile split = (FlowFile)splits.get(i);
                split = session.putAttribute(split, FragmentAttributes.FRAGMENT_ID.key(), fragmentIdentifier);
                split = session.putAttribute(split, FragmentAttributes.FRAGMENT_INDEX.key(), Integer.toString(i));
                split = session.putAttribute(split, FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()));
                split = session.putAttribute(split, FragmentAttributes.FRAGMENT_COUNT.key(), Integer.toString(splits.size()));
                session.transfer(split, REL_SPLIT);
            });
            FlowFile originalFlowFile = FragmentAttributes.copyAttributesToOriginal((ProcessSession)session, (FlowFile)flowFile, (String)fragmentIdentifier, (int)splits.size());
            session.transfer(originalFlowFile, REL_ORIGINAL);
        }
        catch (ProcessException e) {
            this.getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage(), e});
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    static {
        HashSet<String> reservedMetadata = new HashSet<String>();
        reservedMetadata.add("avro.schema");
        reservedMetadata.add("avro.codec");
        RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata);
    }

    private static class DatafileSplitWriter
    implements SplitWriter {
        private final boolean transferMetadata;
        private DataFileWriter<GenericRecord> writer;

        public DatafileSplitWriter(boolean transferMetadata) {
            this.transferMetadata = transferMetadata;
        }

        @Override
        public void init(DataFileStream<GenericRecord> reader, String codec, OutputStream out) throws IOException {
            this.writer = new DataFileWriter((DatumWriter)new GenericDatumWriter());
            if (this.transferMetadata) {
                for (String metaKey : reader.getMetaKeys()) {
                    if (RESERVED_METADATA.contains(metaKey)) continue;
                    this.writer.setMeta(metaKey, reader.getMeta(metaKey));
                }
            }
            this.writer.setCodec(CodecFactory.fromString((String)codec));
            this.writer.create(reader.getSchema(), out);
        }

        @Override
        public void write(GenericRecord datum) throws IOException {
            this.writer.append((Object)datum);
        }

        @Override
        public void flush() throws IOException {
            this.writer.flush();
        }

        @Override
        public void close() throws IOException {
            this.writer.close();
        }
    }

    private static class BareRecordSplitWriter
    implements SplitWriter {
        private Encoder encoder;
        private DatumWriter<GenericRecord> writer;

        private BareRecordSplitWriter() {
        }

        @Override
        public void init(DataFileStream<GenericRecord> reader, String codec, OutputStream out) throws IOException {
            this.writer = new GenericDatumWriter(reader.getSchema());
            this.encoder = EncoderFactory.get().binaryEncoder(out, null);
        }

        @Override
        public void write(GenericRecord datum) throws IOException {
            this.writer.write((Object)datum, this.encoder);
        }

        @Override
        public void flush() throws IOException {
            this.encoder.flush();
        }

        @Override
        public void close() throws IOException {
        }
    }

    private static class RecordSplitter
    implements Splitter {
        private final int splitSize;
        private final boolean transferMetadata;

        public RecordSplitter(int splitSize, boolean transferMetadata) {
            this.splitSize = splitSize;
            this.transferMetadata = transferMetadata;
        }

        @Override
        public List<FlowFile> split(final ProcessSession session, final FlowFile originalFlowFile, final SplitWriter splitWriter) {
            final ArrayList<FlowFile> childFlowFiles = new ArrayList<FlowFile>();
            final AtomicReference<Object> recordHolder = new AtomicReference<Object>(null);
            session.read(originalFlowFile, new InputStreamCallback(){

                public void process(InputStream rawIn) throws IOException {
                    try (BufferedInputStream in = new BufferedInputStream(rawIn);
                         final DataFileStream reader = new DataFileStream((InputStream)in, (DatumReader)new GenericDatumReader());){
                        final AtomicReference<String> codec = new AtomicReference<String>(reader.getMetaString("avro.codec"));
                        if (codec.get() == null) {
                            codec.set("null");
                        }
                        final AtomicReference<Boolean> hasNextHolder = new AtomicReference<Boolean>(reader.hasNext());
                        while (hasNextHolder.get().booleanValue()) {
                            FlowFile childFlowFile = session.create(originalFlowFile);
                            childFlowFile = session.write(childFlowFile, new OutputStreamCallback(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                public void process(OutputStream rawOut) throws IOException {
                                    try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                                        splitWriter.init((DataFileStream<GenericRecord>)reader, (String)codec.get(), out);
                                        for (int recordCount = 0; ((Boolean)hasNextHolder.get()).booleanValue() && recordCount < splitSize; ++recordCount) {
                                            recordHolder.set((GenericRecord)reader.next((Object)((GenericRecord)recordHolder.get())));
                                            splitWriter.write((GenericRecord)recordHolder.get());
                                            hasNextHolder.set(reader.hasNext());
                                        }
                                        splitWriter.flush();
                                    }
                                    finally {
                                        splitWriter.close();
                                    }
                                }
                            });
                            if (splitWriter instanceof BareRecordSplitWriter && transferMetadata) {
                                HashMap<String, String> metadata = new HashMap<String, String>();
                                for (String metaKey : reader.getMetaKeys()) {
                                    metadata.put(metaKey, reader.getMetaString(metaKey));
                                }
                                childFlowFile = session.putAllAttributes(childFlowFile, metadata);
                            }
                            childFlowFiles.add(childFlowFile);
                        }
                    }
                }
            });
            return childFlowFiles;
        }
    }

    private static interface Splitter {
        public List<FlowFile> split(ProcessSession var1, FlowFile var2, SplitWriter var3);
    }

    private static interface SplitWriter {
        public void init(DataFileStream<GenericRecord> var1, String var2, OutputStream var3) throws IOException;

        public void write(GenericRecord var1) throws IOException;

        public void flush() throws IOException;

        public void close() throws IOException;
    }
}

