/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.storage;

import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.CountingOutputStream;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.storage.AbstractGCSProcessor;
import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
import org.apache.nifi.processors.gcp.storage.ListGCSBucket;
import org.apache.nifi.processors.gcp.storage.PutGCSObject;
import org.apache.nifi.processors.gcp.storage.StorageAttributes;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"google cloud", "google", "storage", "gcs", "fetch"})
@CapabilityDescription(value="Fetches a file from a Google Cloud Bucket. Designed to be used in tandem with ListGCSBucket.")
@SeeAlso(value={ListGCSBucket.class, PutGCSObject.class, DeleteGCSObject.class})
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The name of the file, parsed if possible from the Content-Disposition response header"), @WritesAttribute(attribute="gcs.bucket", description="Bucket of the object."), @WritesAttribute(attribute="gcs.key", description="Name of the object."), @WritesAttribute(attribute="gcs.size", description="Size of the object."), @WritesAttribute(attribute="gcs.cache.control", description="Data cache control of the object."), @WritesAttribute(attribute="gcs.component.count", description="The number of components which make up the object."), @WritesAttribute(attribute="gcs.content.disposition", description="The data content disposition of the object."), @WritesAttribute(attribute="gcs.content.encoding", description="The content encoding of the object."), @WritesAttribute(attribute="gcs.content.language", description="The content language of the object."), @WritesAttribute(attribute="mime.type", description="The MIME/Content-Type of the object"), @WritesAttribute(attribute="gcs.crc32c", description="The CRC32C checksum of object's data, encoded in base64 in big-endian order."), @WritesAttribute(attribute="gcs.create.time", description="The creation time of the object (milliseconds)"), @WritesAttribute(attribute="gcs.update.time", description="The last modification time of the object (milliseconds)"), @WritesAttribute(attribute="gcs.encryption.algorithm", description="The algorithm used to encrypt the object."), @WritesAttribute(attribute="gcs.encryption.sha256", description="The SHA256 hash of the key used to encrypt the object"), @WritesAttribute(attribute="gcs.etag", description="The HTTP 1.1 Entity tag for the object."), @WritesAttribute(attribute="gcs.generated.id", description="The service-generated for the object"), @WritesAttribute(attribute="gcs.generation", description="The data generation of the object."), @WritesAttribute(attribute="gcs.md5", description="The MD5 hash of the object's data encoded in base64."), @WritesAttribute(attribute="gcs.media.link", description="The media download link to the object."), @WritesAttribute(attribute="gcs.metageneration", description="The metageneration of the object."), @WritesAttribute(attribute="gcs.owner", description="The owner (uploader) of the object."), @WritesAttribute(attribute="gcs.owner.type", description="The ACL entity type of the uploader of the object."), @WritesAttribute(attribute="gcs.uri", description="The URI of the object as a string.")})
public class FetchGCSObject
extends AbstractGCSProcessor {
    public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder().name("gcs-bucket").displayName("Bucket").description("Bucket of the object.").required(true).defaultValue("${gcs.bucket}").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder().name("gcs-key").displayName("Name").description("Name of the object.").required(true).defaultValue("${" + CoreAttributes.FILENAME.key() + "}").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor GENERATION = new PropertyDescriptor.Builder().name("gcs-generation").displayName("Object Generation").description("The generation of the Object to download. If not set, the latest generation will be downloaded.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).required(false).build();
    public static final PropertyDescriptor ENCRYPTION_KEY = new PropertyDescriptor.Builder().name("gcs-server-side-encryption-key").displayName("Server Side Encryption Key").description("An AES256 Key (encoded in base64) which the object has been encrypted in.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).sensitive(true).build();
    public static final PropertyDescriptor RANGE_START = new PropertyDescriptor.Builder().name("gcs-object-range-start").displayName("Range Start").description("The byte position at which to start reading from the object. An empty value or a value of zero will start reading at the beginning of the object.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).build();
    public static final PropertyDescriptor RANGE_LENGTH = new PropertyDescriptor.Builder().name("gcs-object-range-length").displayName("Range Length").description("The number of bytes to download from the object, starting from the Range Start. An empty value or a value that extends beyond the end of the object will read to the end of the object.").addValidator(StandardValidators.createDataSizeBoundsValidator((long)1L, (long)Long.MAX_VALUE)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).build();

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(BUCKET);
        descriptors.add(KEY);
        descriptors.addAll(super.getSupportedPropertyDescriptors());
        descriptors.add(GENERATION);
        descriptors.add(ENCRYPTION_KEY);
        descriptors.add(RANGE_START);
        descriptors.add(RANGE_LENGTH);
        return Collections.unmodifiableList(descriptors);
    }

    @Override
    protected List<String> getRequiredPermissions() {
        return Collections.singletonList("storage.objects.get");
    }

    @Override
    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>(super.verify(context, verificationLogger, attributes));
        String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
        String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
        Storage storage = (Storage)this.getCloudService(context);
        try {
            FetchedBlob blob = this.fetchBlob(context, storage, attributes);
            CountingOutputStream out = new CountingOutputStream((OutputStream)NullOutputStream.NULL_OUTPUT_STREAM);
            IOUtils.copy((InputStream)blob.contents, (OutputStream)out);
            long byteCount = out.getByteCount();
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Fetch GCS Blob").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Successfully fetched [%s] from Bucket [%s], totaling %s bytes", key, bucketName, byteCount)).build());
        }
        catch (StorageException | IOException e) {
            this.getLogger().error(String.format("Failed to fetch [%s] from Bucket [%s]", key, bucketName), e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Fetch GCS Blob").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to fetch [%s] from Bucket [%s]: %s", key, bucketName, e.getMessage())).build());
        }
        return results;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        long startNanos = System.nanoTime();
        String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
        String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
        Storage storage = (Storage)this.getCloudService();
        long rangeStart = context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L;
        Long rangeLength = context.getProperty(RANGE_LENGTH).isSet() ? Long.valueOf(context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()) : null;
        try {
            FetchedBlob blob = this.fetchBlob(context, storage, flowFile.getAttributes());
            flowFile = session.importFrom(blob.contents, flowFile);
            Map<String, String> attributes = StorageAttributes.createAttributes(blob.blob);
            flowFile = session.putAllAttributes(flowFile, attributes);
        }
        catch (StorageException | IOException e) {
            this.getLogger().error("Failed to fetch GCS Object due to {}", new Object[]{e}, e);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        session.transfer(flowFile, REL_SUCCESS);
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        this.getLogger().info("Successfully retrieved GCS Object for {} in {} millis; routing to success", new Object[]{flowFile, millis});
        session.getProvenanceReporter().fetch(flowFile, "https://" + bucketName + ".storage.googleapis.com/" + key, millis);
    }

    private FetchedBlob fetchBlob(ProcessContext context, Storage storage, Map<String, String> attributes) throws IOException {
        String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
        String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
        Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(attributes).asLong();
        long rangeStart = context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue() : 0L;
        Long rangeLength = context.getProperty(RANGE_LENGTH).isSet() ? Long.valueOf(context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue()) : null;
        BlobId blobId = BlobId.of((String)bucketName, (String)key, (Long)generation);
        List<Storage.BlobSourceOption> blobSourceOptions = this.getBlobSourceOptions(context, attributes);
        if (blobId.getName() == null || blobId.getName().isEmpty()) {
            throw new IllegalArgumentException("Name is required");
        }
        Blob blob = storage.get(blobId);
        if (blob == null) {
            throw new StorageException(404, "Blob " + blobId + " not found");
        }
        if (rangeStart > 0L && rangeStart >= blob.getSize()) {
            if (this.getLogger().isDebugEnabled()) {
                this.getLogger().debug("Start position: {}, blob size: {}", new Object[]{rangeStart, blob.getSize()});
            }
            throw new StorageException(416, "The range specified is not valid for the blob " + blob.getBlobId() + ". Range Start is beyond the end of the blob.");
        }
        ReadChannel reader = storage.reader(blob.getBlobId(), blobSourceOptions.toArray(new Storage.BlobSourceOption[0]));
        reader.seek(rangeStart);
        InputStream rawInputStream = Channels.newInputStream((ReadableByteChannel)reader);
        InputStream blobContents = rangeLength == null ? rawInputStream : new BoundedInputStream(rawInputStream, rangeLength.longValue());
        return new FetchedBlob(blobContents, blob);
    }

    private List<Storage.BlobSourceOption> getBlobSourceOptions(ProcessContext context, Map<String, String> attributes) {
        Long generation = context.getProperty(GENERATION).evaluateAttributeExpressions(attributes).asLong();
        String encryptionKey = context.getProperty(ENCRYPTION_KEY).evaluateAttributeExpressions(attributes).getValue();
        ArrayList<Storage.BlobSourceOption> blobSourceOptions = new ArrayList<Storage.BlobSourceOption>(2);
        if (encryptionKey != null) {
            blobSourceOptions.add(Storage.BlobSourceOption.decryptionKey((String)encryptionKey));
        }
        if (generation != null) {
            blobSourceOptions.add(Storage.BlobSourceOption.generationMatch());
        }
        return blobSourceOptions;
    }

    private class FetchedBlob {
        private final InputStream contents;
        private final Blob blob;

        private FetchedBlob(InputStream contents, Blob blob) {
            this.contents = contents;
            this.blob = blob;
        }
    }
}

