/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.apache.nifi.processors.hadoop.util.GSSExceptionRollbackYieldSessionHandler;
import org.apache.nifi.util.StopWatch;
import org.ietf.jgss.GSSException;

@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@Tags(value={"hadoop", "HCFS", "HDFS", "put", "move", "filesystem", "moveHDFS"})
@CapabilityDescription(value="Rename existing files or a directory of files (non-recursive) on Hadoop Distributed File System (HDFS).")
@ReadsAttribute(attribute="filename", description="The name of the file written to HDFS comes from the value of this attribute.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The name of the file written to HDFS is stored in this attribute."), @WritesAttribute(attribute="absolute.hdfs.path", description="The absolute path to the file on HDFS is stored in this attribute."), @WritesAttribute(attribute="hadoop.file.url", description="The hadoop url for the file is stored in this attribute.")})
@SeeAlso(value={PutHDFS.class, GetHDFS.class})
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.READ_DISTRIBUTED_FILESYSTEM, explanation="Provides operator the ability to retrieve any file that NiFi has access to in HDFS or the local filesystem."), @Restriction(requiredPermission=RequiredPermission.WRITE_DISTRIBUTED_FILESYSTEM, explanation="Provides operator the ability to delete any file that NiFi has access to in HDFS or the local filesystem.")})
public class MoveHDFS
extends AbstractHadoopProcessor {
    public static final String REPLACE_RESOLUTION = "replace";
    public static final String IGNORE_RESOLUTION = "ignore";
    public static final String FAIL_RESOLUTION = "fail";
    private static final Set<Relationship> relationships;
    public static final AllowableValue REPLACE_RESOLUTION_AV;
    public static final AllowableValue IGNORE_RESOLUTION_AV;
    public static final AllowableValue FAIL_RESOLUTION_AV;
    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path";
    public static final Relationship REL_SUCCESS;
    public static final Relationship REL_FAILURE;
    public static final PropertyDescriptor CONFLICT_RESOLUTION;
    public static final PropertyDescriptor FILE_FILTER_REGEX;
    public static final PropertyDescriptor IGNORE_DOTTED_FILES;
    public static final PropertyDescriptor INPUT_DIRECTORY_OR_FILE;
    public static final PropertyDescriptor OUTPUT_DIRECTORY;
    public static final PropertyDescriptor OPERATION;
    public static final PropertyDescriptor REMOTE_OWNER;
    public static final PropertyDescriptor REMOTE_GROUP;
    protected ProcessorConfiguration processorConfig;
    private final AtomicLong logEmptyListing = new AtomicLong(2L);
    private final Lock listingLock = new ReentrantLock();
    private final Lock queueLock = new ReentrantLock();
    private final BlockingQueue<Path> filePathQueue = new LinkedBlockingQueue<Path>();
    private final BlockingQueue<Path> processing = new LinkedBlockingQueue<Path>();

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>(this.properties);
        props.add(CONFLICT_RESOLUTION);
        props.add(INPUT_DIRECTORY_OR_FILE);
        props.add(OUTPUT_DIRECTORY);
        props.add(OPERATION);
        props.add(FILE_FILTER_REGEX);
        props.add(IGNORE_DOTTED_FILES);
        props.add(REMOTE_OWNER);
        props.add(REMOTE_GROUP);
        return props;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws Exception {
        super.abstractOnScheduled(context);
        this.processorConfig = new ProcessorConfiguration(context);
        this.queueLock.lock();
        try {
            this.filePathQueue.clear();
            this.processing.clear();
        }
        finally {
            this.queueLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        ArrayList<Path> files;
        FlowFile flowFile;
        block26: {
            Path inputPath;
            flowFile = session.get();
            if (flowFile == null && context.hasIncomingConnection()) {
                return;
            }
            flowFile = flowFile != null ? flowFile : session.create();
            FileSystem hdfs = this.getFileSystem();
            String filenameValue = context.getProperty(INPUT_DIRECTORY_OR_FILE).evaluateAttributeExpressions(flowFile).getValue();
            try {
                inputPath = this.getNormalizedPath(context, INPUT_DIRECTORY_OR_FILE, flowFile);
                boolean directoryExists = (Boolean)this.getUserGroupInformation().doAs(() -> hdfs.exists(inputPath));
                if (!directoryExists) {
                    throw new IOException("Input Directory or File does not exist in HDFS");
                }
            }
            catch (Exception e) {
                if (this.handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler())) {
                    return;
                }
                this.getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[]{filenameValue, flowFile, e});
                flowFile = session.putAttribute(flowFile, "hdfs.failure.reason", e.getMessage());
                flowFile = session.penalize(flowFile);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            files = new ArrayList<Path>();
            try {
                StopWatch stopWatch = new StopWatch(true);
                Set<Path> listedFiles = this.performListing(context, inputPath);
                stopWatch.stop();
                long millis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                if (listedFiles == null) break block26;
                int newItems = 0;
                this.queueLock.lock();
                try {
                    for (Path file : listedFiles) {
                        if (this.filePathQueue.contains(file) || this.processing.contains(file)) continue;
                        if (!this.filePathQueue.offer(file)) {
                            break;
                        }
                        ++newItems;
                    }
                }
                catch (Exception e) {
                    this.getLogger().warn("Could not add to processing queue due to {}", new Object[]{e.getMessage(), e});
                }
                finally {
                    this.queueLock.unlock();
                }
                if (listedFiles.size() > 0) {
                    this.logEmptyListing.set(3L);
                }
                if (this.logEmptyListing.getAndDecrement() > 0L) {
                    this.getLogger().info("Obtained file listing in {} milliseconds; listing had {} items, {} of which were new", new Object[]{millis, listedFiles.size(), newItems});
                }
            }
            catch (IOException e) {
                context.yield();
                this.getLogger().warn("Error while retrieving list of files", (Throwable)e);
                return;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                context.yield();
                this.getLogger().warn("Interrupted while retrieving files", (Throwable)e);
                return;
            }
        }
        this.queueLock.lock();
        try {
            this.filePathQueue.drainTo(files);
            if (files.isEmpty()) {
                session.remove(flowFile);
                context.yield();
                return;
            }
        }
        finally {
            this.queueLock.unlock();
        }
        try {
            this.processBatchOfFiles(files, context, session, flowFile);
            session.remove(flowFile);
        }
        catch (UncheckedIOException e) {
            this.handleAuthErrors(e, session, context, new GSSExceptionRollbackYieldSessionHandler());
        }
        this.queueLock.lock();
        try {
            this.processing.removeAll(files);
        }
        finally {
            this.queueLock.unlock();
        }
    }

    protected void processBatchOfFiles(List<Path> files, ProcessContext context, ProcessSession session, FlowFile parentFlowFile) {
        Objects.requireNonNull(parentFlowFile, "No parent flowfile for this batch was provided");
        Configuration conf = this.getConfiguration();
        FileSystem hdfs = this.getFileSystem();
        UserGroupInformation ugi = this.getUserGroupInformation();
        if (conf == null || ugi == null) {
            this.getLogger().error("Configuration or UserGroupInformation not configured properly");
            session.transfer(parentFlowFile, REL_FAILURE);
            context.yield();
            return;
        }
        for (Path file : files) {
            ugi.doAs(() -> {
                FlowFile flowFile = session.create(parentFlowFile);
                try {
                    String originalFilename = file.getName();
                    Path outputDirPath = this.getNormalizedPath(context, OUTPUT_DIRECTORY, parentFlowFile);
                    Path newFile = new Path(outputDirPath, originalFilename);
                    boolean destinationExists = hdfs.exists(newFile);
                    if (destinationExists) {
                        switch (this.processorConfig.getConflictResolution()) {
                            case "replace": {
                                if (!hdfs.delete(newFile, false)) break;
                                this.getLogger().info("deleted {} in order to replace with the contents of {}", new Object[]{newFile, flowFile});
                                break;
                            }
                            case "ignore": {
                                session.transfer(flowFile, REL_SUCCESS);
                                this.getLogger().info("transferring {} to success because file with same name already exists", new Object[]{flowFile});
                                return null;
                            }
                            case "fail": {
                                session.transfer(session.penalize(flowFile), REL_FAILURE);
                                this.getLogger().warn("penalizing {} and routing to failure because file with same name already exists", new Object[]{flowFile});
                                return null;
                            }
                        }
                    }
                    try {
                        if (!hdfs.getFileStatus(outputDirPath).isDirectory()) {
                            throw new IOException(outputDirPath + " already exists and is not a directory");
                        }
                    }
                    catch (FileNotFoundException fe) {
                        if (!hdfs.mkdirs(outputDirPath)) {
                            throw new IOException(outputDirPath + " could not be created");
                        }
                        this.changeOwner(context, hdfs, outputDirPath);
                    }
                    boolean moved = false;
                    for (int i = 0; i < 10; ++i) {
                        if (this.processorConfig.getOperation().equals("move")) {
                            if (hdfs.rename(file, newFile)) {
                                moved = true;
                                break;
                            }
                        } else if (FileUtil.copy((FileSystem)hdfs, (Path)file, (FileSystem)hdfs, (Path)newFile, (boolean)false, (Configuration)conf)) {
                            moved = true;
                            break;
                        }
                        Thread.sleep(200L);
                    }
                    if (!moved) {
                        throw new ProcessException("Could not move file " + file + " to its final filename");
                    }
                    this.changeOwner(context, hdfs, newFile);
                    String outputPath = newFile.toString();
                    String newFilename = newFile.getName();
                    String hdfsPath = newFile.getParent().toString();
                    flowFile = session.putAttribute(flowFile, CoreAttributes.FILENAME.key(), newFilename);
                    flowFile = session.putAttribute(flowFile, ABSOLUTE_HDFS_PATH_ATTRIBUTE, hdfsPath);
                    Path qualifiedPath = newFile.makeQualified(hdfs.getUri(), hdfs.getWorkingDirectory());
                    flowFile = session.putAttribute(flowFile, "hadoop.file.url", qualifiedPath.toString());
                    String transitUri = hdfs.getUri() + StringUtils.prependIfMissing((String)outputPath, (CharSequence)"/", (CharSequence[])new CharSequence[0]);
                    session.getProvenanceReporter().send(flowFile, transitUri);
                    session.transfer(flowFile, REL_SUCCESS);
                }
                catch (Throwable t) {
                    Optional causeOptional = this.findCause(t, GSSException.class, gsse -> 13 == gsse.getMajor());
                    if (causeOptional.isPresent()) {
                        throw new UncheckedIOException(new IOException((Throwable)causeOptional.get()));
                    }
                    this.getLogger().error("Failed to rename on HDFS", t);
                    session.transfer(session.penalize(flowFile), REL_FAILURE);
                    context.yield();
                }
                return null;
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Set<Path> performListing(ProcessContext context, Path path) throws IOException, InterruptedException {
        Set<Path> listing = null;
        if (this.listingLock.tryLock()) {
            try {
                FileSystem hdfs = this.getFileSystem();
                listing = this.selectFiles(hdfs, path, null);
            }
            finally {
                this.listingLock.unlock();
            }
        }
        return listing;
    }

    protected void changeOwner(ProcessContext context, FileSystem hdfs, Path name) {
        try {
            String owner = context.getProperty(REMOTE_OWNER).getValue();
            String group = context.getProperty(REMOTE_GROUP).getValue();
            if (owner != null || group != null) {
                hdfs.setOwner(name, owner, group);
            }
        }
        catch (Exception e) {
            this.getLogger().warn("Could not change owner or group of {} on HDFS due to {}", new Object[]{name, e.getMessage(), e});
        }
    }

    protected Set<Path> selectFiles(FileSystem hdfs, Path inputPath, Set<Path> filesVisited) throws IOException, InterruptedException {
        UserGroupInformation ugi;
        boolean directoryExists;
        if (null == filesVisited) {
            filesVisited = new HashSet<Path>();
        }
        if (!(directoryExists = ((Boolean)(ugi = this.getUserGroupInformation()).doAs(() -> hdfs.exists(inputPath))).booleanValue())) {
            throw new IOException("Selection directory " + inputPath.toString() + " doesn't appear to exist!");
        }
        HashSet<Path> files = new HashSet<Path>();
        FileStatus inputStatus = (FileStatus)ugi.doAs(() -> hdfs.getFileStatus(inputPath));
        if (inputStatus.isDirectory()) {
            FileStatus[] fileStatuses;
            for (FileStatus file : fileStatuses = (FileStatus[])ugi.doAs(() -> hdfs.listStatus(inputPath))) {
                Path canonicalFile = file.getPath();
                if (!filesVisited.add(canonicalFile) || file.isDirectory() || !this.processorConfig.getPathFilter(inputPath).accept(canonicalFile)) continue;
                files.add(canonicalFile);
                if (!this.getLogger().isDebugEnabled()) continue;
                this.getLogger().debug(this + " selected file at path: " + canonicalFile.toString());
            }
        } else if (inputStatus.isFile()) {
            files.add(inputPath);
        }
        return files;
    }

    static {
        REPLACE_RESOLUTION_AV = new AllowableValue(REPLACE_RESOLUTION, REPLACE_RESOLUTION, "Replaces the existing file if any.");
        IGNORE_RESOLUTION_AV = new AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION, "Failed rename operation stops processing and routes to success.");
        FAIL_RESOLUTION_AV = new AllowableValue(FAIL_RESOLUTION, FAIL_RESOLUTION, "Failing to rename a file routes to failure.");
        REL_SUCCESS = new Relationship.Builder().name("success").description("Files that have been successfully renamed on HDFS are transferred to this relationship").build();
        REL_FAILURE = new Relationship.Builder().name("failure").description("Files that could not be renamed on HDFS are transferred to this relationship").build();
        CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue(FAIL_RESOLUTION_AV.getValue()).allowableValues(new AllowableValue[]{REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, FAIL_RESOLUTION_AV}).build();
        FILE_FILTER_REGEX = new PropertyDescriptor.Builder().name("File Filter Regex").description("A Java Regular Expression for filtering Filenames; if a filter is supplied then only files whose names match that Regular Expression will be fetched, otherwise all files will be fetched").required(false).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
        IGNORE_DOTTED_FILES = new PropertyDescriptor.Builder().name("Ignore Dotted Files").description("If true, files whose names begin with a dot (\".\") will be ignored").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
        INPUT_DIRECTORY_OR_FILE = new PropertyDescriptor.Builder().name("Input Directory or File").description("The HDFS directory from which files should be read, or a single file to read.").defaultValue("${path}").required(true).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
        OUTPUT_DIRECTORY = new PropertyDescriptor.Builder().name("Output Directory").description("The HDFS directory where the files will be moved to").required(true).addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
        OPERATION = new PropertyDescriptor.Builder().name("HDFS Operation").description("The operation that will be performed on the source file").required(true).allowableValues(new String[]{"move", "copy"}).defaultValue("move").build();
        REMOTE_OWNER = new PropertyDescriptor.Builder().name("Remote Owner").description("Changes the owner of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change owner").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        REMOTE_GROUP = new PropertyDescriptor.Builder().name("Remote Group").description("Changes the group of the HDFS file to this value after it is written. This only works if NiFi is running as a user that has HDFS super user privilege to change group").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(rels);
    }

    protected static class ProcessorConfiguration {
        private final String conflictResolution;
        private final String operation;
        private final Pattern fileFilterPattern;
        private final boolean ignoreDottedFiles;

        ProcessorConfiguration(ProcessContext context) {
            this.conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
            this.operation = context.getProperty(OPERATION).getValue();
            String fileFilterRegex = context.getProperty(FILE_FILTER_REGEX).getValue();
            this.fileFilterPattern = fileFilterRegex == null ? null : Pattern.compile(fileFilterRegex);
            this.ignoreDottedFiles = context.getProperty(IGNORE_DOTTED_FILES).asBoolean();
        }

        public String getOperation() {
            return this.operation;
        }

        public String getConflictResolution() {
            return this.conflictResolution;
        }

        protected PathFilter getPathFilter(final Path dir) {
            return new PathFilter(){

                public boolean accept(Path path) {
                    if (ignoreDottedFiles && path.getName().startsWith(".")) {
                        return false;
                    }
                    String relativePath = AbstractHadoopProcessor.getPathDifference((Path)dir, (Path)path);
                    Object pathToCompare = relativePath.length() == 0 ? path.getName() : relativePath + "/" + path.getName();
                    return fileFilterPattern == null || fileFilterPattern.matcher((CharSequence)pathToCompare).matches();
                }
            };
        }
    }
}

