/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard.ftp.commands;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ftpserver.command.AbstractCommand;
import org.apache.ftpserver.ftplet.DataConnection;
import org.apache.ftpserver.ftplet.DefaultFtpReply;
import org.apache.ftpserver.ftplet.FtpException;
import org.apache.ftpserver.ftplet.FtpFile;
import org.apache.ftpserver.ftplet.FtpRequest;
import org.apache.ftpserver.impl.FtpIoSession;
import org.apache.ftpserver.impl.FtpServerContext;
import org.apache.ftpserver.impl.IODataConnectionFactory;
import org.apache.ftpserver.impl.LocalizedDataTransferFtpReply;
import org.apache.ftpserver.impl.LocalizedFtpReply;
import org.apache.ftpserver.impl.ServerDataConnectionFactory;
import org.apache.ftpserver.impl.ServerFtpStatistics;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.standard.ftp.commands.DetailedFtpCommandException;
import org.apache.nifi.processors.standard.ftp.commands.FtpCommandException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FtpCommandSTOR
extends AbstractCommand {
    private static final Logger LOG = LoggerFactory.getLogger(FtpCommandSTOR.class);
    private final AtomicReference<ProcessSessionFactory> sessionFactory;
    private final CountDownLatch sessionFactorySetSignal;
    private final Relationship relationshipSuccess;

    public FtpCommandSTOR(AtomicReference<ProcessSessionFactory> sessionFactory, CountDownLatch sessionFactorySetSignal, Relationship relationshipSuccess) {
        this.sessionFactory = sessionFactory;
        this.sessionFactorySetSignal = sessionFactorySetSignal;
        this.relationshipSuccess = relationshipSuccess;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request) {
        try {
            this.executeCommand(ftpSession, context, request);
        }
        catch (DetailedFtpCommandException ftpCommandException) {
            ftpSession.write((Object)LocalizedDataTransferFtpReply.translate((FtpIoSession)ftpSession, (FtpRequest)request, (FtpServerContext)context, (int)ftpCommandException.getFtpReturnCode(), (String)ftpCommandException.getSubId(), (String)ftpCommandException.getMessage(), (FtpFile)ftpCommandException.getFtpFile()));
        }
        catch (FtpCommandException ftpCommandException) {
            ftpSession.write((Object)new DefaultFtpReply(ftpCommandException.getFtpReturnCode(), ftpCommandException.getMessage()));
        }
        finally {
            ftpSession.resetState();
            ftpSession.getDataConnection().closeDataConnection();
        }
    }

    private void executeCommand(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request) throws FtpCommandException {
        String fileName = this.getArgument(request);
        this.checkDataConnection(ftpSession);
        FtpFile ftpFile = this.getFtpFile(ftpSession, fileName);
        this.checkWritePermission(ftpFile);
        this.sendFileStatusOkay(ftpSession, context, request, ftpFile.getAbsolutePath());
        DataConnection dataConnection = this.openDataConnection(ftpSession, ftpFile);
        this.transferData(dataConnection, ftpSession, context, request, ftpFile);
    }

    private String getArgument(FtpRequest request) throws FtpCommandException {
        String argument = request.getArgument();
        if (argument == null) {
            throw new DetailedFtpCommandException(501, "STOR", null, null);
        }
        return argument;
    }

    private void checkDataConnection(FtpIoSession ftpSession) throws FtpCommandException {
        InetAddress address;
        ServerDataConnectionFactory dataConnectionFactory = ftpSession.getDataConnection();
        if (dataConnectionFactory instanceof IODataConnectionFactory && (address = ((IODataConnectionFactory)dataConnectionFactory).getInetAddress()) == null) {
            throw new FtpCommandException(503, "PORT or PASV must be issued first");
        }
    }

    private FtpFile getFtpFile(FtpIoSession ftpSession, String fileName) throws FtpCommandException {
        FtpFile ftpFile = null;
        try {
            ftpFile = ftpSession.getFileSystemView().getFile(fileName);
        }
        catch (FtpException e) {
            LOG.error("Exception getting file object", (Throwable)e);
        }
        if (ftpFile == null) {
            throw new DetailedFtpCommandException(550, "STOR.invalid", fileName, ftpFile);
        }
        return ftpFile;
    }

    private void checkWritePermission(FtpFile ftpFile) throws FtpCommandException {
        if (!ftpFile.isWritable()) {
            throw new DetailedFtpCommandException(550, "STOR.permission", ftpFile.getAbsolutePath(), ftpFile);
        }
    }

    private void sendFileStatusOkay(FtpIoSession ftpSession, FtpServerContext context, FtpRequest request, String fileAbsolutePath) {
        ftpSession.write((Object)LocalizedFtpReply.translate((FtpIoSession)ftpSession, (FtpRequest)request, (FtpServerContext)context, (int)150, (String)"STOR", (String)fileAbsolutePath)).awaitUninterruptibly(10000L);
    }

    private DataConnection openDataConnection(FtpIoSession ftpSession, FtpFile ftpFile) throws FtpCommandException {
        DataConnection dataConnection;
        try {
            dataConnection = ftpSession.getDataConnection().openConnection();
        }
        catch (Exception exception) {
            LOG.error("Exception getting the input data stream", (Throwable)exception);
            throw new DetailedFtpCommandException(425, "STOR", ftpFile.getAbsolutePath(), ftpFile);
        }
        return dataConnection;
    }

    private void transferData(DataConnection dataConnection, FtpIoSession ftpSession, FtpServerContext context, FtpRequest request, FtpFile ftpFile) throws FtpCommandException {
        ProcessSession processSession;
        try {
            processSession = this.createProcessSession();
        }
        catch (InterruptedException | TimeoutException exception) {
            LOG.error("ProcessSession could not be acquired, command STOR aborted.", (Throwable)exception);
            throw new FtpCommandException(425, "File transfer failed.");
        }
        FlowFile flowFile = processSession.create();
        long transferredBytes = 0L;
        try (OutputStream flowFileOutputStream = processSession.write(flowFile);){
            transferredBytes = dataConnection.transferFromClient(ftpSession.getFtpletSession(), flowFileOutputStream);
            LOG.info("File received {}", (Object)ftpFile.getAbsolutePath());
        }
        catch (SocketException socketException) {
            LOG.error("Socket exception during data transfer", (Throwable)socketException);
            processSession.rollback();
            throw new DetailedFtpCommandException(426, "STOR", ftpFile.getAbsolutePath(), ftpFile);
        }
        catch (IOException ioException) {
            LOG.error("IOException during data transfer", (Throwable)ioException);
            processSession.rollback();
            throw new DetailedFtpCommandException(551, "STOR", ftpFile.getAbsolutePath(), ftpFile);
        }
        try {
            ServerFtpStatistics ftpStat = (ServerFtpStatistics)context.getFtpStatistics();
            ftpStat.setUpload(ftpSession, ftpFile, transferredBytes);
            processSession.putAttribute(flowFile, CoreAttributes.FILENAME.key(), ftpFile.getName());
            processSession.putAttribute(flowFile, CoreAttributes.PATH.key(), this.getPath(ftpFile));
            processSession.getProvenanceReporter().modifyContent(flowFile);
            processSession.transfer(flowFile, this.relationshipSuccess);
        }
        catch (Exception exception) {
            processSession.rollback();
            LOG.error("Process session error. ", (Throwable)exception);
        }
        long byteCount = transferredBytes;
        processSession.commitAsync(() -> ftpSession.write((Object)LocalizedDataTransferFtpReply.translate((FtpIoSession)ftpSession, (FtpRequest)request, (FtpServerContext)context, (int)226, (String)"STOR", (String)ftpFile.getAbsolutePath(), (FtpFile)ftpFile, (long)byteCount)));
    }

    private String getPath(FtpFile ftpFile) {
        String absolutePath = ftpFile.getAbsolutePath();
        int endIndex = absolutePath.length() - ftpFile.getName().length();
        return ftpFile.getAbsolutePath().substring(0, endIndex);
    }

    private ProcessSession createProcessSession() throws InterruptedException, TimeoutException {
        ProcessSessionFactory processSessionFactory = this.getProcessSessionFactory();
        return processSessionFactory.createSession();
    }

    private ProcessSessionFactory getProcessSessionFactory() throws InterruptedException, TimeoutException {
        if (this.sessionFactorySetSignal.await(10000L, TimeUnit.MILLISECONDS)) {
            return this.sessionFactory.get();
        }
        throw new TimeoutException("Waiting period for sessionFactory is over.");
    }
}

