/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.storage;

import com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
import com.azure.storage.file.datalake.models.DataLakeStorageException;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.fileresource.service.api.FileResource;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
import org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage;
import org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage;
import org.apache.nifi.processors.azure.storage.ListAzureDataLakeStorage;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.processors.azure.storage.utils.WritingStrategy;
import org.apache.nifi.processors.transfer.ResourceTransferProperties;
import org.apache.nifi.processors.transfer.ResourceTransferSource;
import org.apache.nifi.processors.transfer.ResourceTransferUtils;
import org.apache.nifi.util.StringUtils;

@Tags(value={"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"})
@SeeAlso(value={DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class})
@CapabilityDescription(value="Writes the contents of a FlowFile as a file on Azure Data Lake Storage Gen 2")
@WritesAttributes(value={@WritesAttribute(attribute="azure.filesystem", description="The name of the Azure File System"), @WritesAttribute(attribute="azure.directory", description="The name of the Azure Directory"), @WritesAttribute(attribute="azure.filename", description="The name of the Azure File"), @WritesAttribute(attribute="azure.primaryUri", description="Primary location for file content"), @WritesAttribute(attribute="azure.length", description="The length of the Azure File")})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
public class PutAzureDataLakeStorage
extends AbstractAzureDataLakeStorageProcessor {
    public static final String FAIL_RESOLUTION = "fail";
    public static final String REPLACE_RESOLUTION = "replace";
    public static final String IGNORE_RESOLUTION = "ignore";
    public static long MAX_CHUNK_SIZE = 0x6400000L;
    public static final PropertyDescriptor CONFLICT_RESOLUTION = new PropertyDescriptor.Builder().name("conflict-resolution-strategy").displayName("Conflict Resolution Strategy").description("Indicates what should happen when a file with the same name already exists in the output directory").required(true).defaultValue("fail").allowableValues(new String[]{"fail", "replace", "ignore"}).build();
    protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder().name("writing-strategy").displayName("Writing Strategy").description("Defines the approach for writing the Azure file.").required(true).allowableValues(WritingStrategy.class).defaultValue(WritingStrategy.WRITE_AND_RENAME.getValue()).build();
    public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder().name("base-temporary-path").displayName("Base Temporary Path").description("The Path where the temporary directory will be created. The Path name cannot contain a leading '/'. The root directory can be designated by the empty string value. Non-existing directories will be created.The Temporary File Directory name is _nifitempdirectory").defaultValue("").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator((Validator)new AbstractAzureDataLakeStorageProcessor.DirectoryValidator("Base Temporary Path")).dependsOn(WRITING_STRATEGY, (DescribedValue)WritingStrategy.WRITE_AND_RENAME, new DescribedValue[0]).build();
    private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(ADLS_CREDENTIALS_SERVICE, FILESYSTEM, DIRECTORY, FILE, WRITING_STRATEGY, BASE_TEMPORARY_PATH, CONFLICT_RESOLUTION, ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE, ResourceTransferProperties.FILE_RESOURCE_SERVICE, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTIES;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        long startNanos = System.nanoTime();
        try {
            DataLakeFileClient fileClient;
            String fileSystem = PutAzureDataLakeStorage.evaluateFileSystemProperty(context, flowFile);
            String directory = PutAzureDataLakeStorage.evaluateDirectoryProperty(context, flowFile);
            String fileName = PutAzureDataLakeStorage.evaluateFileNameProperty(context, flowFile);
            DataLakeFileSystemClient fileSystemClient = this.getFileSystemClient(context, flowFile, fileSystem);
            DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory);
            WritingStrategy writingStrategy = WritingStrategy.valueOf(context.getProperty(WRITING_STRATEGY).getValue());
            String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue();
            ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf((String)context.getProperty(ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE).getValue());
            Optional fileResourceFound = ResourceTransferUtils.getFileResource((ResourceTransferSource)resourceTransferSource, (ProcessContext)context, (Map)flowFile.getAttributes());
            long transferSize = fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize());
            if (writingStrategy == WritingStrategy.WRITE_AND_RENAME) {
                String tempPath = PutAzureDataLakeStorage.evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH);
                String tempDirectory = this.createPath(tempPath, "_nifitempdirectory");
                String tempFilePrefix = UUID.randomUUID().toString();
                DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory);
                DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true);
                this.uploadFile(session, flowFile, fileResourceFound, transferSize, tempFileClient);
                this.createDirectoryIfNotExists(directoryClient);
                fileClient = this.renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution);
            } else {
                fileClient = this.createFile(directoryClient, fileName, conflictResolution);
                if (fileClient != null) {
                    this.uploadFile(session, flowFile, fileResourceFound, transferSize, fileClient);
                }
            }
            if (fileClient != null) {
                String fileUrl = fileClient.getFileUrl();
                Map<String, String> attributes = this.createAttributeMap(fileSystem, directory, fileName, fileUrl, transferSize);
                flowFile = session.putAllAttributes(flowFile, attributes);
                long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                session.getProvenanceReporter().send(flowFile, fileUrl, transferMillis);
            }
            session.transfer(flowFile, REL_SUCCESS);
        }
        catch (Exception e) {
            this.getLogger().error("Failed to create file on Azure Data Lake Storage", (Throwable)e);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private DataLakeFileSystemClient getFileSystemClient(ProcessContext context, FlowFile flowFile, String fileSystem) {
        DataLakeServiceClient storageClient = this.getStorageClient((PropertyContext)context, flowFile);
        return storageClient.getFileSystemClient(fileSystem);
    }

    private Map<String, String> createAttributeMap(String fileSystem, String originalDirectory, String fileName, String fileUrl, long length) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("azure.filesystem", fileSystem);
        attributes.put("azure.directory", originalDirectory);
        attributes.put("azure.filename", fileName);
        attributes.put("azure.primaryUri", fileUrl);
        attributes.put("azure.length", String.valueOf(length));
        return attributes;
    }

    private void createDirectoryIfNotExists(DataLakeDirectoryClient directoryClient) {
        if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists().booleanValue()) {
            directoryClient.create();
        }
    }

    private void uploadFile(ProcessSession session, FlowFile flowFile, Optional<FileResource> fileResourceFound, long transferSize, DataLakeFileClient fileClient) throws Exception {
        if (transferSize > 0L) {
            try (BufferedInputStream inputStream = new BufferedInputStream(fileResourceFound.map(FileResource::getInputStream).orElseGet(() -> session.read(flowFile)));){
                PutAzureDataLakeStorage.uploadContent(fileClient, inputStream, transferSize);
            }
            catch (Exception e) {
                this.removeFile(fileClient);
                throw e;
            }
        }
    }

    static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) {
        long chunkSize;
        for (long chunkStart = 0L; chunkStart < length; chunkStart += chunkSize) {
            chunkSize = Math.min(length - chunkStart, MAX_CHUNK_SIZE);
            BoundedInputStream boundedIn = new BoundedInputStream(in, chunkSize);
            fileClient.append((InputStream)boundedIn, chunkStart, chunkSize);
        }
        fileClient.flush(length, true);
    }

    DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, String fileName, String conflictResolution) {
        String destinationPath = this.createPath(directoryClient.getDirectoryPath(), fileName);
        try {
            boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
            return directoryClient.createFile(fileName, overwrite);
        }
        catch (DataLakeStorageException dataLakeStorageException) {
            return this.handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution);
        }
    }

    DataLakeFileClient renameFile(DataLakeFileClient sourceFileClient, String destinationDirectory, String destinationFileName, String conflictResolution) {
        String destinationPath = this.createPath(destinationDirectory, destinationFileName);
        try {
            DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions();
            if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
                destinationCondition.setIfNoneMatch("*");
            }
            return (DataLakeFileClient)sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue();
        }
        catch (DataLakeStorageException dataLakeStorageException) {
            this.removeFile(sourceFileClient);
            return this.handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution);
        }
    }

    private DataLakeFileClient handleDataLakeStorageException(DataLakeStorageException dataLakeStorageException, String destinationPath, String conflictResolution) {
        boolean fileAlreadyExists;
        boolean bl = fileAlreadyExists = dataLakeStorageException.getStatusCode() == 409;
        if (fileAlreadyExists && conflictResolution.equals(IGNORE_RESOLUTION)) {
            this.getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", new Object[]{destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution});
            return null;
        }
        if (fileAlreadyExists && conflictResolution.equals(FAIL_RESOLUTION)) {
            throw new ProcessException(String.format("File [%s] already exists.", destinationPath), (Throwable)dataLakeStorageException);
        }
        throw new ProcessException(String.format("File operation failed [%s]", destinationPath), (Throwable)dataLakeStorageException);
    }

    private String createPath(String baseDirectory, String path) {
        return StringUtils.isNotBlank((String)baseDirectory) ? baseDirectory + "/" + path : path;
    }

    private void removeFile(DataLakeFileClient fileClient) {
        try {
            fileClient.delete();
        }
        catch (Exception e) {
            this.getLogger().error("Renaming File [{}] failed", new Object[]{fileClient.getFileName(), e});
        }
    }
}

