/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.CompressionType;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.ListHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.apache.nifi.util.StopWatch;

@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"hadoop", "hcfs", "hdfs", "get", "ingest", "fetch", "source"})
@CapabilityDescription(value="Retrieves a file from HDFS. The content of the incoming FlowFile is replaced by the content of the file in HDFS. The file in HDFS is left intact without any changes being made to it.")
@WritesAttribute(attribute="hdfs.failure.reason", description="When a FlowFile is routed to 'failure', this attribute is added indicating why the file could not be fetched from HDFS")
@SeeAlso(value={ListHDFS.class, GetHDFS.class, PutHDFS.class})
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.READ_DISTRIBUTED_FILESYSTEM, explanation="Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem.")})
public class FetchHDFS
extends AbstractHadoopProcessor {
    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder().name("HDFS Filename").description("The name of the HDFS file to retrieve").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("${path}/${filename}").addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("FlowFiles will be routed to this relationship once they have been updated with the content of the HDFS file").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieved and trying again will likely not be helpful. This would occur, for instance, if the file is not found or if there is a permissions issue").build();
    static final Relationship REL_COMMS_FAILURE = new Relationship.Builder().name("comms.failure").description("FlowFiles will be routed to this relationship if the content of the HDFS file cannot be retrieve due to a communications failure. This generally indicates that the Fetch should be tried again.").build();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>(this.properties);
        props.add(FILENAME);
        props.add(COMPRESSION_CODEC);
        return props;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        relationships.add(REL_COMMS_FAILURE);
        return relationships;
    }

    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        Path path;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        final FileSystem hdfs = this.getFileSystem();
        UserGroupInformation ugi = this.getUserGroupInformation();
        String filenameValue = this.getPath(context, flowFile);
        try {
            path = this.getNormalizedPath(this.getPath(context, flowFile));
        }
        catch (IllegalArgumentException e) {
            this.getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, flowFile, e});
            flowFile = session.putAttribute(flowFile, this.getAttributePrefix() + ".failure.reason", e.getMessage());
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, this.getFailureRelationship());
            return;
        }
        final StopWatch stopWatch = new StopWatch(true);
        final FlowFile finalFlowFile = flowFile;
        ugi.doAs((PrivilegedAction)new PrivilegedAction<Object>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             * Loose catch block
             */
            @Override
            public Object run() {
                boolean inferCompressionCodec;
                FSDataInputStream stream = null;
                CompressionCodec codec = null;
                Configuration conf = FetchHDFS.this.getConfiguration();
                CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
                CompressionType compressionType = FetchHDFS.this.getCompressionType(context);
                boolean bl = inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
                if (inferCompressionCodec) {
                    codec = compressionCodecFactory.getCodec(path);
                } else if (compressionType != CompressionType.NONE) {
                    codec = FetchHDFS.this.getCompressionCodec(context, FetchHDFS.this.getConfiguration());
                }
                FlowFile flowFile = finalFlowFile;
                Path qualifiedPath = path.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
                try {
                    String outputFilename;
                    String originalFilename = path.getName();
                    stream = hdfs.open(path, 16384);
                    if (codec != null) {
                        stream = codec.createInputStream((InputStream)stream);
                        outputFilename = StringUtils.removeEnd((String)originalFilename, (String)codec.getDefaultExtension());
                    } else {
                        outputFilename = originalFilename;
                    }
                    flowFile = session.importFrom((InputStream)stream, finalFlowFile);
                    flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
                    stopWatch.stop();
                    FetchHDFS.this.getLogger().info("Successfully received content from {} for {} in {}", new Object[]{qualifiedPath, flowFile, stopWatch.getDuration()});
                    session.getProvenanceReporter().fetch(flowFile, qualifiedPath.toString(), stopWatch.getDuration(TimeUnit.MILLISECONDS));
                    session.transfer(flowFile, FetchHDFS.this.getSuccessRelationship());
                }
                catch (FileNotFoundException | AccessControlException e) {
                    FetchHDFS.this.getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{qualifiedPath, flowFile, e});
                    flowFile = session.putAttribute(flowFile, FetchHDFS.this.getAttributePrefix() + ".failure.reason", e.getMessage());
                    flowFile = session.penalize(flowFile);
                    session.transfer(flowFile, FetchHDFS.this.getFailureRelationship());
                    IOUtils.closeQuietly((InputStream)stream);
                }
                catch (IOException e2) {
                    FetchHDFS.this.getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to comms.failure", new Object[]{qualifiedPath, flowFile, e2});
                    flowFile = session.penalize(flowFile);
                    session.transfer(flowFile, FetchHDFS.this.getCommsFailureRelationship());
                    {
                        catch (Throwable throwable) {
                            IOUtils.closeQuietly(stream);
                            throw throwable;
                        }
                    }
                    IOUtils.closeQuietly((InputStream)stream);
                }
                IOUtils.closeQuietly((InputStream)stream);
                return null;
            }
        });
    }

    protected Relationship getSuccessRelationship() {
        return REL_SUCCESS;
    }

    protected Relationship getFailureRelationship() {
        return REL_FAILURE;
    }

    protected Relationship getCommsFailureRelationship() {
        return REL_COMMS_FAILURE;
    }

    protected String getPath(ProcessContext context, FlowFile flowFile) {
        return context.getProperty(FILENAME).evaluateAttributeExpressions(flowFile).getValue();
    }

    protected String getAttributePrefix() {
        return "hdfs";
    }

    protected CompressionType getCompressionType(ProcessContext context) {
        return CompressionType.valueOf((String)context.getProperty(COMPRESSION_CODEC).toString());
    }
}

