/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.CompressionType;
import org.apache.nifi.processors.hadoop.ListHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;

@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"hadoop", "HCFS", "HDFS", "get", "fetch", "ingest", "source", "filesystem"})
@CapabilityDescription(value="Fetch files from Hadoop Distributed File System (HDFS) into FlowFiles. This Processor will delete the file from HDFS after fetching it.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), @WritesAttribute(attribute="path", description="The path is set to the relative path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"abc/1/2/3\".")})
@SeeAlso(value={PutHDFS.class, ListHDFS.class})
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.READ_DISTRIBUTED_FILESYSTEM, explanation="Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), @Restriction(requiredPermission=RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM, explanation="Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")})
public class GetHDFS
extends AbstractHadoopProcessor {
    public static final String BUFFER_SIZE_KEY = "io.file.buffer.size";
    public static final int BUFFER_SIZE_DEFAULT = 4096;
    public static final int MAX_WORKING_QUEUE_SIZE = 25000;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All files retrieved from HDFS are transferred to this relationship").build();
    public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder().name("Recurse Subdirectories").description("Indicates whether to pull files from subdirectories of the HDFS directory").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor KEEP_SOURCE_FILE = new PropertyDescriptor.Builder().name("Keep Source File").description("Determines whether to delete the file from HDFS after it has been successfully transferred. If true, the file will be fetched repeatedly. This is intended for testing only.").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor FILE_FILTER_REGEX = new PropertyDescriptor.Builder().name("File Filter Regex").description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched").required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    public static final PropertyDescriptor FILTER_MATCH_NAME_ONLY = new PropertyDescriptor.Builder().name("Filter Match Name Only").description("If true then File Filter Regex will match on just the filename, otherwise subdirectory names will be included with filename in the regex comparison").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder().name("Ignore Dotted Files").description("If true, files whose names begin with a dot (\".\") will be ignored").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor MIN_AGE = new PropertyDescriptor.Builder().name("Minimum File Age").description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored").required(true).addValidator(StandardValidators.createTimePeriodValidator((long)0L, (TimeUnit)TimeUnit.MILLISECONDS, (long)Long.MAX_VALUE, (TimeUnit)TimeUnit.NANOSECONDS)).defaultValue("0 sec").build();
    public static final PropertyDescriptor MAX_AGE = new PropertyDescriptor.Builder().name("Maximum File Age").description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored").required(false).addValidator(StandardValidators.createTimePeriodValidator((long)100L, (TimeUnit)TimeUnit.MILLISECONDS, (long)Long.MAX_VALUE, (TimeUnit)TimeUnit.NANOSECONDS)).build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The maximum number of files to pull in each iteration, based on run schedule.").required(true).defaultValue("100").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor POLLING_INTERVAL = new PropertyDescriptor.Builder().name("Polling Interval").description("Indicates how long to wait between performing directory listings").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("0 sec").build();
    public static final PropertyDescriptor BUFFER_SIZE = new PropertyDescriptor.Builder().name("IO Buffer Size").description("Amount of memory to use to buffer file contents during IO. This overrides the Hadoop Configuration").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    private static final Set<Relationship> relationships = Collections.singleton(REL_SUCCESS);
    protected ProcessorConfiguration processorConfig;
    private final AtomicLong logEmptyListing = new AtomicLong(2L);
    private final AtomicLong lastPollTime = new AtomicLong(0L);
    private final Lock listingLock = new ReentrantLock();
    private final Lock queueLock = new ReentrantLock();
    private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<Path>(25000);
    private final BlockingQueue<Path> processing = new LinkedBlockingQueue<Path>();

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>(this.properties);
        props.add(DIRECTORY);
        props.add(RECURSE_SUBDIRS);
        props.add(KEEP_SOURCE_FILE);
        props.add(FILE_FILTER_REGEX);
        props.add(FILTER_MATCH_NAME_ONLY);
        props.add(IGNORE_DOTTED_FILES);
        props.add(MIN_AGE);
        props.add(MAX_AGE);
        props.add(POLLING_INTERVAL);
        props.add(BATCH_SIZE);
        props.add(BUFFER_SIZE);
        props.add(COMPRESSION_CODEC);
        return props;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        long maximumAge;
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(super.customValidate(context));
        Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        long minimumAge = minAgeProp == null ? 0L : minAgeProp;
        long l = maximumAge = maxAgeProp == null ? Long.MAX_VALUE : maxAgeProp;
        if (minimumAge > maximumAge) {
            problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration").explanation(MIN_AGE.getName() + " cannot be greater than " + MAX_AGE.getName()).build());
        }
        try {
            new Path(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
        }
        catch (Exception e) {
            problems.add(new ValidationResult.Builder().valid(false).subject("Directory").explanation(e.getMessage()).build());
        }
        return problems;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        this.abstractOnScheduled(context);
        this.processorConfig = new ProcessorConfiguration(context);
        this.queueLock.lock();
        try {
            this.filePathQueue.clear();
            this.processing.clear();
        }
        finally {
            this.queueLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        ArrayList<Path> files;
        int batchSize;
        block18: {
            batchSize = context.getProperty(BATCH_SIZE).asInteger();
            files = new ArrayList<Path>(batchSize);
            if (this.filePathQueue.size() < 12500) {
                try {
                    StopWatch stopWatch = new StopWatch(true);
                    Set<Path> listedFiles = this.performListing(context);
                    stopWatch.stop();
                    long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                    if (listedFiles == null) break block18;
                    int newItems = 0;
                    this.queueLock.lock();
                    try {
                        for (Path file : listedFiles) {
                            if (this.filePathQueue.contains(file) || this.processing.contains(file)) continue;
                            if (!this.filePathQueue.offer(file)) {
                                break;
                            }
                            ++newItems;
                        }
                    }
                    catch (Exception e) {
                        this.getLogger().warn("Could not add to processing queue", (Throwable)e);
                    }
                    finally {
                        this.queueLock.unlock();
                    }
                    if (listedFiles.size() > 0) {
                        this.logEmptyListing.set(3L);
                    }
                    if (this.logEmptyListing.getAndDecrement() > 0L) {
                        this.getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", new Object[]{millis, listedFiles.size(), newItems});
                    }
                }
                catch (IOException e) {
                    this.handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler());
                    this.getLogger().warn("Error while retrieving list of files due to {}", new Object[]{e.getMessage(), e});
                    return;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    context.yield();
                    this.getLogger().warn("Interrupted while retrieving files", (Throwable)e);
                    return;
                }
            }
        }
        this.queueLock.lock();
        try {
            this.filePathQueue.drainTo(files, batchSize);
            if (files.isEmpty()) {
                context.yield();
                return;
            }
            this.processing.addAll(files);
        }
        finally {
            this.queueLock.unlock();
        }
        this.processBatchOfFiles(files, context, session);
        session.commitAsync(() -> {
            this.queueLock.lock();
            try {
                this.processing.removeAll(files);
            }
            finally {
                this.queueLock.unlock();
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void processBatchOfFiles(List<Path> files, ProcessContext context, ProcessSession session) {
        boolean inferCompressionCodec;
        InputStream stream = null;
        CompressionCodec codec = null;
        Configuration conf = this.getConfiguration();
        FileSystem hdfs = this.getFileSystem();
        boolean keepSourceFiles = context.getProperty(KEEP_SOURCE_FILE).asBoolean();
        Double bufferSizeProp = context.getProperty(BUFFER_SIZE).asDataSize(DataUnit.B);
        int bufferSize = bufferSizeProp != null ? bufferSizeProp.intValue() : conf.getInt(BUFFER_SIZE_KEY, 4096);
        Path rootDir = this.getNormalizedPath(context, DIRECTORY);
        CompressionType compressionType = CompressionType.valueOf((String)context.getProperty(COMPRESSION_CODEC).toString());
        boolean bl = inferCompressionCodec = compressionType == CompressionType.AUTOMATIC;
        if (inferCompressionCodec || compressionType != CompressionType.NONE) {
            codec = this.getCompressionCodec(context, this.getConfiguration());
        }
        CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(conf);
        for (Path file : files) {
            long millis;
            String dataRate;
            FlowFile flowFile;
            block13: {
                String outputFilename;
                block12: {
                    if (((Boolean)this.getUserGroupInformation().doAs(() -> hdfs.exists(file))).booleanValue()) break block12;
                    IOUtils.closeQuietly(stream);
                    stream = null;
                    continue;
                }
                String originalFilename = file.getName();
                String relativePath = GetHDFS.getPathDifference((Path)rootDir, (Path)file);
                stream = (InputStream)this.getUserGroupInformation().doAs(() -> hdfs.open(file, bufferSize));
                if (inferCompressionCodec) {
                    codec = compressionCodecFactory.getCodec(file);
                }
                if (codec != null) {
                    stream = codec.createInputStream(stream);
                    outputFilename = StringUtils.removeEnd((String)originalFilename, (String)codec.getDefaultExtension());
                } else {
                    outputFilename = originalFilename;
                }
                flowFile = session.create();
                StopWatch stopWatch = new StopWatch(true);
                flowFile = session.importFrom(stream, flowFile);
                stopWatch.stop();
                dataRate = stopWatch.calculateDataRate(flowFile.getSize());
                millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                flowFile = session.putAttribute(flowFile, CoreAttributes.PATH.key(), relativePath.isEmpty() ? "." : relativePath);
                flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), outputFilename);
                if (keepSourceFiles || ((Boolean)this.getUserGroupInformation().doAs(() -> hdfs.delete(file, false))).booleanValue()) break block13;
                this.getLogger().warn("Could not remove {} from HDFS. Not ingesting this file ...", new Object[]{file});
                session.remove(flowFile);
                IOUtils.closeQuietly((InputStream)stream);
                stream = null;
                continue;
            }
            try {
                session.getProvenanceReporter().receive(flowFile, file.toString());
                session.transfer(flowFile, REL_SUCCESS);
                this.getLogger().info("retrieved {} from HDFS {} in {} milliseconds at a rate of {}", new Object[]{flowFile, file, millis, dataRate});
            }
            catch (Throwable t) {
                try {
                    if (this.handleAuthErrors(t, session, context, new GSSExceptionRollbackYieldSessionHandler())) continue;
                    this.getLogger().error("Error retrieving file {} from HDFS due to {}", new Object[]{file, t});
                    session.rollback();
                    context.yield();
                    continue;
                }
                catch (Throwable throwable) {
                    throw throwable;
                }
                finally {
                    IOUtils.closeQuietly(stream);
                    stream = null;
                    continue;
                }
            }
            IOUtils.closeQuietly((InputStream)stream);
            stream = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Set<Path> performListing(ProcessContext context) throws IOException, InterruptedException {
        long pollingIntervalMillis = context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
        long nextPollTime = this.lastPollTime.get() + pollingIntervalMillis;
        Set<Path> listing = null;
        if (System.currentTimeMillis() >= nextPollTime && this.listingLock.tryLock()) {
            try {
                FileSystem hdfs = this.getFileSystem();
                Path directoryPath = this.getNormalizedPath(context, DIRECTORY);
                boolean directoryExists = (Boolean)this.getUserGroupInformation().doAs(() -> hdfs.exists(directoryPath));
                if (!directoryExists) {
                    context.yield();
                    this.getLogger().warn("The directory {} does not exist.", new Object[]{directoryPath});
                } else {
                    listing = this.selectFiles(hdfs, directoryPath, null);
                }
                this.lastPollTime.set(System.currentTimeMillis());
            }
            finally {
                this.listingLock.unlock();
            }
        }
        return listing;
    }

    protected Set<Path> selectFiles(FileSystem hdfs, Path dir, Set<Path> filesVisited) throws IOException, InterruptedException {
        FileStatus[] fileStatuses;
        if (null == filesVisited) {
            filesVisited = new HashSet<Path>();
        }
        HashSet<Path> files = new HashSet<Path>();
        for (FileStatus file : fileStatuses = (FileStatus[])this.getUserGroupInformation().doAs(() -> hdfs.listStatus(dir))) {
            if (files.size() >= 25000) break;
            Path canonicalFile = file.getPath();
            if (!filesVisited.add(canonicalFile)) continue;
            if (file.isDirectory() && this.processorConfig.getRecurseSubdirs()) {
                files.addAll(this.selectFiles(hdfs, canonicalFile, filesVisited));
                continue;
            }
            if (file.isDirectory() || !this.processorConfig.getPathFilter(dir).accept(canonicalFile)) continue;
            long fileAge = System.currentTimeMillis() - file.getModificationTime();
            if (this.processorConfig.getMinimumAge() >= fileAge || fileAge >= this.processorConfig.getMaximumAge()) continue;
            files.add(canonicalFile);
            if (!this.getLogger().isDebugEnabled()) continue;
            this.getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
        }
        return files;
    }

    protected static class ProcessorConfiguration {
        private final Pattern fileFilterPattern;
        private final boolean ignoreDottedFiles;
        private final boolean filterMatchBasenameOnly;
        private final long minimumAge;
        private final long maximumAge;
        private final boolean recurseSubdirs;

        ProcessorConfiguration(ProcessContext context) {
            this.ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
            String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
            this.fileFilterPattern = fileFilterRegex == null ? null : Pattern.compile(fileFilterRegex);
            this.filterMatchBasenameOnly = context.getProperty(FILTER_MATCH_NAME_ONLY).asBoolean();
            Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
            this.minimumAge = minAgeProp == null ? 0L : minAgeProp;
            Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
            this.maximumAge = maxAgeProp == null ? Long.MAX_VALUE : maxAgeProp;
            this.recurseSubdirs = context.getProperty(RECURSE_SUBDIRS).asBoolean();
        }

        protected long getMinimumAge() {
            return this.minimumAge;
        }

        protected long getMaximumAge() {
            return this.maximumAge;
        }

        public boolean getRecurseSubdirs() {
            return this.recurseSubdirs;
        }

        protected PathFilter getPathFilter(final Path dir) {
            return new PathFilter(){

                public boolean accept(Path path) {
                    String relativePath;
                    if (ignoreDottedFiles && path.getName().startsWith(".")) {
                        return false;
                    }
                    Object pathToCompare = filterMatchBasenameOnly ? path.getName() : ((relativePath = AbstractHadoopProcessor.getPathDifference((Path)dir, (Path)path)).length() == 0 ? path.getName() : relativePath + "/" + path.getName());
                    return fileFilterPattern == null || fileFilterPattern.matcher((CharSequence)pathToCompare).matches();
                }
            };
        }
    }
}

