/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub.lite;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.auth.Credentials;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite;
import org.threeten.bp.Duration;

@SeeAlso(value={ConsumeGCPubSubLite.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"})
@CapabilityDescription(value="Publishes the content of the incoming flowfile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties. If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'. In its current state, this processor will only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with 'Use Application Default Credentials' or 'Use Compute Engine Credentials'.")
@DynamicProperty(name="Attribute name", value="Value to be set to the attribute", description="Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes(value={@WritesAttribute(attribute="gcp.pubsub.messageId", description="ID of the pubsub message published to the configured Google Cloud PubSub topic"), @WritesAttribute(attribute="gcp.pubsub.topic", description="Name of the Google Cloud PubSub topic the message was published to")})
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="The entirety of the FlowFile's content will be read into memory to be sent as a PubSub message.")
public class PublishGCPubSubLite
extends AbstractGCPubSubProcessor
implements VerifiableProcessor {
    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder().name("gcp-pubsub-topic").displayName("Topic Name").description("Name of the Google Cloud PubSub Topic. Example: projects/8476107443/locations/europe-west1-d/topics/my-lite-topic").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor ORDERING_KEY = new PropertyDescriptor.Builder().name("gcp-ordering-key").displayName("Ordering Key").description("Messages with the same ordering key will always get published to the same partition. When this property is not set, messages can get published to different partitions if more than one partition exists for the topic.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor.Builder().name("gcp-batch-bytes").displayName("Batch Bytes Threshold").description("Publish request gets triggered based on this Batch Bytes Threshold property and the " + BATCH_SIZE.getDisplayName() + " property, whichever condition is met first.").required(true).defaultValue("3 MB").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    private Publisher publisher = null;

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return Collections.unmodifiableList(Arrays.asList(TOPIC_NAME, GCP_CREDENTIALS_PROVIDER_SERVICE, ORDERING_KEY, BATCH_SIZE, BATCH_BYTES));
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName).displayName(propertyDescriptorName).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    @Override
    public Set<Relationship> getRelationships() {
        return Collections.unmodifiableSet(new HashSet<Relationship>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(1);
        String topic = validationContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
        try {
            TopicPath.parse((String)topic);
        }
        catch (ApiException e) {
            results.add(new ValidationResult.Builder().subject(TOPIC_NAME.getName()).input(topic).valid(false).explanation("The Topic does not have a valid format.").build());
        }
        return results;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        try {
            if (this.publisher == null) {
                this.publisher = this.getPublisher(context);
            }
        }
        catch (Exception e) {
            this.getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", (Throwable)e);
            throw new ProcessException((Throwable)e);
        }
        try {
            this.publisher.startAsync().awaitRunning();
        }
        catch (Exception e) {
            this.getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", this.publisher.failureCause());
            throw new ProcessException((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
        List flowFiles = session.get(flowFileCount);
        if (flowFiles.isEmpty()) {
            context.yield();
            return;
        }
        if (this.publisher == null) {
            this.getLogger().error("Google Cloud PubSub Lite Publisher was not properly created. Yielding the processor...");
            context.yield();
            return;
        }
        if (!this.publisher.isRunning()) {
            this.getLogger().error("Google Cloud PubSub Lite Publisher is not running. Yielding the processor...", this.publisher.failureCause());
            throw new ProcessException(this.publisher.failureCause());
        }
        long startNanos = System.nanoTime();
        ArrayList successfulFlowFiles = new ArrayList();
        String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
        ArrayList<ApiFuture> futures = new ArrayList<ApiFuture>();
        try {
            for (FlowFile flowFile : flowFiles) {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                session.exportTo(flowFile, (OutputStream)baos);
                ByteString flowFileContent = ByteString.copyFrom((byte[])baos.toByteArray());
                String orderingKey = context.getProperty(ORDERING_KEY).evaluateAttributeExpressions(flowFile).getValue();
                PubsubMessage.Builder message = PubsubMessage.newBuilder().setData(flowFileContent).setPublishTime(Timestamp.newBuilder().build()).putAllAttributes(this.getDynamicAttributesMap(context, flowFile));
                if (orderingKey != null) {
                    message.setOrderingKey(orderingKey);
                }
                ApiFuture messageIdFuture = this.publisher.publish(message.build());
                futures.add(messageIdFuture);
                flowFile = session.putAttribute(flowFile, "gcp.pubsub.topic", topicName);
            }
            try {
                ApiFutures.allAsList(futures).get();
                successfulFlowFiles.addAll(flowFiles);
            }
            catch (InterruptedException | ExecutionException e) {
                this.getLogger().error("Failed to publish the messages to Google Cloud PubSub Lite topic '{}' due to {}, routing all messages from the batch to failure", new Object[]{topicName, e.getLocalizedMessage()}, (Throwable)e);
                session.transfer((Collection)flowFiles, REL_FAILURE);
                context.yield();
            }
        }
        finally {
            if (!successfulFlowFiles.isEmpty()) {
                session.transfer(successfulFlowFiles, REL_SUCCESS);
                for (FlowFile flowFile : successfulFlowFiles) {
                    long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                    session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
                }
            }
        }
    }

    @OnStopped
    public void onStopped() {
        try {
            if (this.publisher != null) {
                this.publisher.stopAsync().awaitTerminated();
                this.publisher = null;
            }
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Publisher", (Throwable)e);
        }
    }

    private Map<String, String> getDynamicAttributesMap(ProcessContext context, FlowFile flowFile) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        for (Map.Entry entry : context.getProperties().entrySet()) {
            if (!((PropertyDescriptor)entry.getKey()).isDynamic()) continue;
            String value = context.getProperty((PropertyDescriptor)entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
            attributes.put(((PropertyDescriptor)entry.getKey()).getName(), value);
        }
        return attributes;
    }

    private Publisher getPublisher(ProcessContext context) {
        TopicPath topicPath = TopicPath.parse((String)context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue());
        PublisherSettings publisherSettings = PublisherSettings.newBuilder().setTopicPath(topicPath).setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context))).setBatchingSettings(BatchingSettings.newBuilder().setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong()).setDelayThreshold(Duration.ofMillis((long)100L)).setRequestByteThreshold(Long.valueOf(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue())).setIsEnabled(Boolean.valueOf(true)).build()).build();
        return Publisher.create((PublisherSettings)publisherSettings);
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> verificationResults = new ArrayList<ConfigVerificationResult>();
        try {
            this.getPublisher(context);
            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Create the Publisher").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created the Google Cloud PubSub Lite Publisher").build());
        }
        catch (Exception e) {
            verificationLogger.error("Failed to create Google Cloud PubSub Lite Publisher", (Throwable)e);
            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Create the Publisher").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Failed to create Google Cloud PubSub Lite Publisher: " + e.getLocalizedMessage()).build());
        }
        return verificationResults;
    }
}

