/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.storage;

import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage;
import org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage;
import org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.StringUtils;

@Tags(value={"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso(value={DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription(value="Puts content into an Azure Data Lake Storage Gen 2")
@WritesAttributes(value={@WritesAttribute(attribute="azure.filesystem", description="The name of the Azure File System"), @WritesAttribute(attribute="azure.directory", description="The name of the Azure Directory"), @WritesAttribute(attribute="azure.filename", description="The name of the Azure File"), @WritesAttribute(attribute="azure.primaryUri", description="Primary location for file content"), @WritesAttribute(attribute="azure.length", description="The length of the Azure File")})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
public class PutAzureDataLakeStorage
extends AbstractAzureDataLakeStorageProcessor {
    public static final String FAIL_RESOLUTION = "fail";
    public static final String REPLACE_RESOLUTION = "replace";
    public static final String IGNORE_RESOLUTION = "ignore";
    public static long MAX_CHUNK_SIZE = 0x6400000L;
    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("conflict-resolution-strategy").displayName("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue("fail").allowableValues(new String[]{"fail", "replace", "ignore"}).build();
    public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder().name("base-temporary-path").displayName("Base Temporary Path").description("The Path where the temporary directory will be created. The Path name cannot contain a leading '/'. The root directory can be designated by the empty string value. Non-existing directories will be created.The Temporary File Directory name is _nifitempdirectory").defaultValue("").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator((Validator)new AbstractAzureDataLakeStorageProcessor.DirectoryValidator("Base Temporary Path")).build();
    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(ADLS_CREDENTIALS_SERVICE, FILESYSTEM, DIRECTORY, FILE, BASE_TEMPORARY_PATH, CONFLICT_RESOLUTION, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        long startNanos = System.nanoTime();
        try {
            String fileSystem = PutAzureDataLakeStorage.evaluateFileSystemProperty(context, flowFile);
            String originalDirectory = PutAzureDataLakeStorage.evaluateDirectoryProperty(context, flowFile);
            String tempPath = PutAzureDataLakeStorage.evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH);
            String tempDirectory = this.createPath(tempPath, "_nifitempdirectory");
            String fileName = PutAzureDataLakeStorage.evaluateFileNameProperty(context, flowFile);
            DataLakeFileSystemClient fileSystemClient = this.getFileSystemClient(context, flowFile, fileSystem);
            DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory);
            String tempFilePrefix = UUID.randomUUID().toString();
            DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory);
            String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
            DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true);
            this.appendContent(flowFile, tempFileClient, session);
            this.createDirectoryIfNotExists(directoryClient);
            String fileUrl = this.renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution);
            if (fileUrl != null) {
                Map<String, String> attributes = this.createAttributeMap(flowFile, fileSystem, originalDirectory, fileName, fileUrl);
                flowFile = session.putAllAttributes(flowFile, attributes);
                long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                session.getProvenanceReporter().send(flowFile, fileUrl, transferMillis);
            }
            session.transfer(flowFile, REL_SUCCESS);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to create file on Azure Data Lake Storage", (Throwable)e);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private DataLakeFileSystemClient getFileSystemClient(ProcessContext context, FlowFile flowFile, String fileSystem) {
        DataLakeServiceClient storageClient = this.getStorageClient((PropertyContext)context, flowFile);
        return storageClient.getFileSystemClient(fileSystem);
    }

    private Map<String, String> createAttributeMap(FlowFile flowFile, String fileSystem, String originalDirectory, String fileName, String fileUrl) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("azure.filesystem", fileSystem);
        attributes.put("azure.directory", originalDirectory);
        attributes.put("azure.filename", fileName);
        attributes.put("azure.primaryUri", fileUrl);
        attributes.put("azure.length", String.valueOf(flowFile.getSize()));
        return attributes;
    }

    private void createDirectoryIfNotExists(DataLakeDirectoryClient directoryClient) {
        if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists().booleanValue()) {
            directoryClient.create();
        }
    }

    void appendContent(FlowFile flowFile, DataLakeFileClient fileClient, ProcessSession session) throws IOException {
        long length = flowFile.getSize();
        if (length > 0L) {
            try (InputStream rawIn = session.read(flowFile);
                 BufferedInputStream bufferedIn = new BufferedInputStream(rawIn);){
                PutAzureDataLakeStorage.uploadContent(fileClient, bufferedIn, length);
            }
            catch (Exception e) {
                this.removeTempFile(fileClient);
                throw e;
            }
        }
    }

    static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) {
        long chunkSize;
        for (long chunkStart = 0L; chunkStart < length; chunkStart += chunkSize) {
            chunkSize = Math.min(length - chunkStart, MAX_CHUNK_SIZE);
            BoundedInputStream boundedIn = new BoundedInputStream(in, chunkSize);
            fileClient.append((InputStream)boundedIn, chunkStart, chunkSize);
        }
        fileClient.flush(length, true);
    }

    String renameFile(DataLakeFileClient sourceFileClient, String destinationDirectory, String destinationFileName, String conflictResolution) {
        try {
            DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions();
            if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
                destinationCondition.setIfNoneMatch("*");
            }
            String destinationPath = this.createPath(destinationDirectory, destinationFileName);
            return ((DataLakeFileClient)sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue()).getFileUrl();
        }
        catch (DataLakeStorageException dataLakeStorageException) {
            this.removeTempFile(sourceFileClient);
            if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) {
                this.getLogger().info("File with the same name [{}] already exists. Remote file not modified due to {} being set to '{}'.", new Object[]{sourceFileClient.getFileName(), CONFLICT_RESOLUTION.getDisplayName(), conflictResolution});
                return null;
            }
            if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) {
                throw new ProcessException(String.format("File with the same name [%s] already exists.", sourceFileClient.getFileName()), (Throwable)dataLakeStorageException);
            }
            throw new ProcessException(String.format("Renaming File [%s] failed", sourceFileClient.getFileName()), (Throwable)dataLakeStorageException);
        }
    }

    private String createPath(String baseDirectory, String path) {
        return StringUtils.isNotBlank((String)baseDirectory) ? baseDirectory + "/" + path : path;
    }

    private void removeTempFile(DataLakeFileClient fileClient) {
        try {
            fileClient.delete();
        }
        catch (Exception e) {
            this.getLogger().error("Renaming File [{}] failed", new Object[]{fileClient.getFileName(), e});
        }
    }
}

