/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.FileTransfer;
import org.apache.nifi.processors.standard.util.PermissionDeniedException;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;

public abstract class FetchFileTransfer
extends AbstractProcessor {
    static final AllowableValue COMPLETION_NONE = new AllowableValue("None", "None", "Leave the file as-is");
    static final AllowableValue COMPLETION_MOVE = new AllowableValue("Move File", "Move File", "Move the file to the directory specified by the <Move Destination Directory> property");
    static final AllowableValue COMPLETION_DELETE = new AllowableValue("Delete File", "Delete File", "Deletes the original file from the remote system");
    static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder().name("Hostname").description("The fully-qualified hostname or IP address of the host to fetch the data from").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    static final PropertyDescriptor UNDEFAULTED_PORT = new PropertyDescriptor.Builder().name("Port").description("The port to connect to on the remote host to fetch the data from").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder().name("Username").description("Username").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    public static final PropertyDescriptor REMOTE_FILENAME = new PropertyDescriptor.Builder().name("Remote File").description("The fully qualified filename on the remote system").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor COMPLETION_STRATEGY = new PropertyDescriptor.Builder().name("Completion Strategy").description("Specifies what to do with the original file on the server once it has been pulled into NiFi. If the Completion Strategy fails, a warning will be logged but the data will still be transferred.").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{COMPLETION_NONE, COMPLETION_MOVE, COMPLETION_DELETE}).defaultValue(COMPLETION_NONE.getValue()).required(true).build();
    static final PropertyDescriptor MOVE_CREATE_DIRECTORY = new PropertyDescriptor.Builder().fromPropertyDescriptor(FileTransfer.CREATE_DIRECTORY).description(String.format("Used when '%s' is '%s'. %s", COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), FileTransfer.CREATE_DIRECTORY.getDescription())).required(false).build();
    static final PropertyDescriptor MOVE_DESTINATION_DIR = new PropertyDescriptor.Builder().name("Move Destination Directory").description(String.format("The directory on the remote server to move the original file to once it has been ingested into NiFi. This property is ignored unless the %s is set to '%s'. The specified directory must already exist on the remote system if '%s' is disabled, or the rename will fail.", COMPLETION_STRATEGY.getDisplayName(), COMPLETION_MOVE.getDisplayName(), MOVE_CREATE_DIRECTORY.getDisplayName())).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).build();
    static final PropertyDescriptor FILE_NOT_FOUND_LOG_LEVEL = new PropertyDescriptor.Builder().displayName("Log level when file not found").name("fetchfiletransfer-notfound-loglevel").description("Log level to use in case the file does not exist when the processor is triggered").allowableValues((Enum[])LogLevel.values()).defaultValue(LogLevel.ERROR.toString()).required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
    static final Relationship REL_COMMS_FAILURE = new Relationship.Builder().name("comms.failure").description("Any FlowFile that could not be fetched from the remote server due to a communications failure will be transferred to this Relationship.").build();
    static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not.found").description("Any FlowFile for which we receive a 'Not Found' message from the remote server will be transferred to this Relationship.").build();
    static final Relationship REL_PERMISSION_DENIED = new Relationship.Builder().name("permission.denied").description("Any FlowFile that could not be fetched from the remote server due to insufficient permissions will be transferred to this Relationship.").build();
    private final Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> fileTransferMap = new HashMap<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>>();
    private final long IDLE_CONNECTION_MILLIS = TimeUnit.SECONDS.toMillis(10L);
    private volatile long lastClearTime = System.currentTimeMillis();
    private LogLevel levelFileNotFound = LogLevel.ERROR;

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_NOT_FOUND);
        relationships.add(REL_PERMISSION_DENIED);
        relationships.add(REL_COMMS_FAILURE);
        return relationships;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        this.levelFileNotFound = LogLevel.valueOf((String)context.getProperty(FILE_NOT_FOUND_LOG_LEVEL).getValue());
    }

    private void closeConnections(boolean closeNonIdleConnections) {
        for (Map.Entry<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> entry : this.fileTransferMap.entrySet()) {
            FileTransferIdleWrapper wrapper;
            BlockingQueue<FileTransferIdleWrapper> wrapperQueue = entry.getValue();
            ArrayList<FileTransferIdleWrapper> putBack = new ArrayList<FileTransferIdleWrapper>();
            while ((wrapper = (FileTransferIdleWrapper)wrapperQueue.poll()) != null) {
                long lastUsed = wrapper.getLastUsed();
                long nanosSinceLastUse = System.nanoTime() - lastUsed;
                if (!closeNonIdleConnections && TimeUnit.NANOSECONDS.toMillis(nanosSinceLastUse) < this.IDLE_CONNECTION_MILLIS) {
                    putBack.add(wrapper);
                    continue;
                }
                try {
                    wrapper.getFileTransfer().close();
                }
                catch (IOException ioe) {
                    this.getLogger().warn("Failed to close Idle Connection due to {}", new Object[]{ioe}, (Throwable)ioe);
                }
            }
            for (FileTransferIdleWrapper toPutBack : putBack) {
                wrapperQueue.offer(toPutBack);
            }
        }
    }

    @OnStopped
    public void cleanup() {
        this.closeConnections(true);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(HOSTNAME);
        properties.add(UNDEFAULTED_PORT);
        properties.add(REMOTE_FILENAME);
        properties.add(COMPLETION_STRATEGY);
        properties.add(MOVE_DESTINATION_DIR);
        properties.add(MOVE_CREATE_DIRECTORY);
        return properties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        BlockingQueue<FileTransferIdleWrapper> transferQueue;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        String host = context.getProperty(HOSTNAME).evaluateAttributeExpressions(flowFile).getValue();
        int port = context.getProperty(UNDEFAULTED_PORT).evaluateAttributeExpressions(flowFile).asInteger();
        String filename = context.getProperty(REMOTE_FILENAME).evaluateAttributeExpressions(flowFile).getValue();
        Map<Tuple<String, Integer>, BlockingQueue<FileTransferIdleWrapper>> map = this.fileTransferMap;
        synchronized (map) {
            Tuple tuple = new Tuple((Object)host, (Object)port);
            transferQueue = this.fileTransferMap.get(tuple);
            if (transferQueue == null) {
                transferQueue = new LinkedBlockingQueue<FileTransferIdleWrapper>();
                this.fileTransferMap.put((Tuple<String, Integer>)tuple, transferQueue);
            }
            if (System.currentTimeMillis() - this.lastClearTime > this.IDLE_CONNECTION_MILLIS) {
                this.closeConnections(false);
                this.lastClearTime = System.currentTimeMillis();
            }
        }
        FileTransferIdleWrapper transferWrapper = (FileTransferIdleWrapper)transferQueue.poll();
        FileTransfer transfer = transferWrapper == null ? this.createFileTransfer(context) : transferWrapper.getFileTransfer();
        try {
            try {
                flowFile = transfer.getRemoteFile(filename, flowFile, session);
            }
            catch (FileNotFoundException e) {
                this.getLogger().log(this.levelFileNotFound, "Failed to fetch content for {} from filename {} on remote host {} because the file could not be found on the remote system; routing to {}", new Object[]{flowFile, filename, host, REL_NOT_FOUND.getName()});
                session.transfer(session.penalize(flowFile), REL_NOT_FOUND);
                session.getProvenanceReporter().route(flowFile, REL_NOT_FOUND);
                this.cleanupTransfer(transfer, false, transferQueue, host, port);
                return;
            }
            catch (PermissionDeniedException e) {
                this.getLogger().error("Failed to fetch content for {} from filename {} on remote host {} due to insufficient permissions; routing to {}", new Object[]{flowFile, filename, host, REL_PERMISSION_DENIED.getName()});
                session.transfer(session.penalize(flowFile), REL_PERMISSION_DENIED);
                session.getProvenanceReporter().route(flowFile, REL_PERMISSION_DENIED);
                this.cleanupTransfer(transfer, false, transferQueue, host, port);
                return;
            }
            catch (IOException | ProcessException e) {
                this.getLogger().error("Failed to fetch content for {} from filename {} on remote host {}:{} due to {}; routing to comms.failure", new Object[]{flowFile, filename, host, port, e.toString()}, e);
                session.transfer(session.penalize(flowFile), REL_COMMS_FAILURE);
                this.cleanupTransfer(transfer, true, transferQueue, host, port);
                return;
            }
            String protocolName = transfer.getProtocolName();
            HashMap<Object, String> attributes = new HashMap<Object, String>();
            attributes.put(protocolName + ".remote.host", host);
            attributes.put(protocolName + ".remote.port", String.valueOf(port));
            attributes.put(protocolName + ".remote.filename", filename);
            if (filename.contains("/")) {
                String path = StringUtils.substringBeforeLast((String)filename, (String)"/");
                String filenameOnly = StringUtils.substringAfterLast((String)filename, (String)"/");
                attributes.put(CoreAttributes.PATH.key(), path);
                attributes.put(CoreAttributes.FILENAME.key(), filenameOnly);
            } else {
                attributes.put(CoreAttributes.FILENAME.key(), filename);
            }
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.getProvenanceReporter().fetch(flowFile, protocolName + "://" + host + ":" + port + "/" + filename, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, REL_SUCCESS);
            BlockingQueue<FileTransferIdleWrapper> queue = transferQueue;
            Runnable cleanupTask = () -> this.cleanupTransfer(transfer, false, queue, host, port);
            FlowFile flowFileReceived = flowFile;
            session.commitAsync(() -> {
                this.performCompletionStrategy(transfer, context, flowFileReceived, filename, host, port);
                cleanupTask.run();
            }, t -> cleanupTask.run());
        }
        catch (Throwable t2) {
            this.getLogger().error("Failed to fetch file", t2);
            this.cleanupTransfer(transfer, true, transferQueue, host, port);
        }
    }

    private void cleanupTransfer(FileTransfer transfer, boolean closeConnection, BlockingQueue<FileTransferIdleWrapper> transferQueue, String host, int port) {
        if (closeConnection) {
            this.getLogger().debug("Closing FileTransfer...");
            try {
                transfer.close();
            }
            catch (IOException e) {
                this.getLogger().warn("Failed to close connection to {}:{} due to {}", new Object[]{host, port, e.getMessage()}, (Throwable)e);
            }
        } else {
            this.getLogger().debug("Returning FileTransfer to pool...");
            transferQueue.offer(new FileTransferIdleWrapper(transfer, System.nanoTime()));
        }
    }

    private void performCompletionStrategy(FileTransfer transfer, ProcessContext context, FlowFile flowFile, String filename, String host, int port) {
        String completionStrategy = context.getProperty(COMPLETION_STRATEGY).getValue();
        if (COMPLETION_DELETE.getValue().equalsIgnoreCase(completionStrategy)) {
            try {
                transfer.deleteFile(flowFile, null, filename);
            }
            catch (FileNotFoundException fileNotFoundException) {
            }
            catch (IOException ioe) {
                this.getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to remove the remote file due to {}", new Object[]{flowFile, host, port, filename, ioe}, (Throwable)ioe);
            }
        } else if (COMPLETION_MOVE.getValue().equalsIgnoreCase(completionStrategy)) {
            String targetDir = context.getProperty(MOVE_DESTINATION_DIR).evaluateAttributeExpressions(flowFile).getValue();
            String simpleFilename = StringUtils.substringAfterLast((String)filename, (String)"/");
            try {
                String absoluteTargetDirPath = transfer.getAbsolutePath(flowFile, targetDir);
                File targetFile = new File(absoluteTargetDirPath, simpleFilename);
                if (context.getProperty(MOVE_CREATE_DIRECTORY).asBoolean().booleanValue()) {
                    transfer.ensureDirectoryExists(flowFile, targetFile.getParentFile());
                }
                transfer.rename(flowFile, filename, targetFile.getAbsolutePath());
            }
            catch (IOException ioe) {
                this.getLogger().warn("Successfully fetched the content for {} from {}:{}{} but failed to rename the remote file due to {}", new Object[]{flowFile, host, port, filename, ioe}, (Throwable)ioe);
            }
        }
    }

    protected abstract FileTransfer createFileTransfer(ProcessContext var1);

    private static class FileTransferIdleWrapper {
        private final FileTransfer fileTransfer;
        private final long lastUsed;

        public FileTransferIdleWrapper(FileTransfer fileTransfer, long lastUsed) {
            this.fileTransfer = fileTransfer;
            this.lastUsed = lastUsed;
        }

        public FileTransfer getFileTransfer() {
            return this.fileTransfer;
        }

        public long getLastUsed() {
            return this.lastUsed;
        }
    }
}

