/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub.lite;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
import org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite;

@SeeAlso(value={PublishGCPubSubLite.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"})
@CapabilityDescription(value="Consumes message from the configured Google Cloud PubSub Lite subscription.")
@WritesAttributes(value={@WritesAttribute(attribute="gcp.pubsub.messageId", description="ID of the pubsub message published to the configured Google Cloud PubSub topic"), @WritesAttribute(attribute="gcp.pubsub.ordering.key", description="If non-empty, identifies related messages for which publish order should be respected. If a 'Subscription' has 'enable_message_ordering' set to 'true', messages published with the same non-empty 'ordering_key' value will be delivered to subscribers in the order in which they are received by the Pub/Sub system. All 'PubsubMessage's published in a given 'PublishRequest' must specify the same 'ordering_key' value."), @WritesAttribute(attribute="gcp.pubsub.attributesCount", description="Number of attributes the consumed PubSub message has, if any"), @WritesAttribute(attribute="gcp.pubsub.publishTime", description="Timestamp value when the message was published"), @WritesAttribute(attribute="Dynamic Attributes", description="Other than the listed attributes, this processor may write zero or more attributes, if the original Google Cloud Publisher client added any attributes to the message while sending")})
public class ConsumeGCPubSubLite
extends AbstractGCPubSubProcessor
implements VerifiableProcessor {
    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder().name("gcp-pubsub-subscription").displayName("Subscription").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).description("Name of the Google Cloud Pub/Sub Subscription. Example: projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor BYTES_OUTSTANDING = new PropertyDescriptor.Builder().name("gcp-bytes-outstanding").displayName("Bytes Outstanding").description("The number of quota bytes that may be outstanding to the client.").required(true).defaultValue("10 MB").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).build();
    public static final PropertyDescriptor MESSAGES_OUTSTANDING = new PropertyDescriptor.Builder().name("gcp-messages-outstanding").displayName("Messages Outstanding").description("The number of messages that may be outstanding to the client.").required(true).defaultValue("1000").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    private Subscriber subscriber = null;
    private BlockingQueue<Message> messages = new LinkedBlockingQueue<Message>();

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>(1);
        String subscription = validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
        try {
            SubscriptionPath.parse((String)subscription);
        }
        catch (ApiException e) {
            results.add(new ValidationResult.Builder().subject(SUBSCRIPTION.getName()).input(subscription).valid(false).explanation("The Suscription does not have a valid format.").build());
        }
        return results;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        try {
            if (this.subscriber == null) {
                this.subscriber = this.getSubscriber(context);
            }
        }
        catch (Exception e) {
            this.getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", (Throwable)e);
            throw new ProcessException((Throwable)e);
        }
        try {
            this.subscriber.startAsync().awaitRunning();
        }
        catch (Exception e) {
            this.getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", this.subscriber.failureCause());
            throw new ProcessException((Throwable)e);
        }
    }

    @OnStopped
    public void onStopped() {
        try {
            if (this.subscriber != null) {
                this.subscriber.stopAsync().awaitTerminated();
                this.subscriber = null;
            }
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Subscriber", (Throwable)e);
        }
    }

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return Collections.unmodifiableList(Arrays.asList(SUBSCRIPTION, GCP_CREDENTIALS_PROVIDER_SERVICE, BYTES_OUTSTANDING, MESSAGES_OUTSTANDING));
    }

    @Override
    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.subscriber == null) {
            this.getLogger().error("Google Cloud PubSub Lite Subscriber was not properly created. Yielding the processor...");
            context.yield();
            return;
        }
        if (!this.subscriber.isRunning()) {
            this.getLogger().error("Google Cloud PubSub Lite Subscriber is not running. Yielding the processor...", this.subscriber.failureCause());
            throw new ProcessException(this.subscriber.failureCause());
        }
        Message message = (Message)this.messages.poll();
        if (message == null) {
            context.yield();
            return;
        }
        FlowFile flowFile = session.create();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("gcp.pubsub.messageId", message.getMessage().getMessageId());
        attributes.put("gcp.pubsub.ordering.key", message.getMessage().getOrderingKey());
        attributes.put("gcp.pubsub.attributesCount", String.valueOf(message.getMessage().getAttributesCount()));
        attributes.put("gcp.pubsub.publishTime", String.valueOf(message.getMessage().getPublishTime().getSeconds()));
        attributes.putAll(message.getMessage().getAttributesMap());
        flowFile = session.putAllAttributes(flowFile, attributes);
        flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toStringUtf8().getBytes()));
        session.transfer(flowFile, REL_SUCCESS);
        session.getProvenanceReporter().receive(flowFile, context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
        message.getConsumer().ack();
    }

    private Subscriber getSubscriber(ProcessContext context) {
        SubscriptionPath subscriptionPath = SubscriptionPath.parse((String)context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
        FlowControlSettings flowControlSettings = FlowControlSettings.builder().setBytesOutstanding(context.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue()).setMessagesOutstanding(context.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong().longValue()).build();
        MessageReceiver receiver = (message, consumer) -> {
            try {
                this.messages.put(new Message(message, consumer));
            }
            catch (InterruptedException e) {
                this.getLogger().error("Could not save the message inside the internal queue of the processor", (Throwable)e);
            }
        };
        SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder().setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context))).setSubscriptionPath(subscriptionPath).setReceiver(receiver).setPerPartitionFlowControlSettings(flowControlSettings).build();
        return Subscriber.create((SubscriberSettings)subscriberSettings);
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> verificationResults = new ArrayList<ConfigVerificationResult>();
        try {
            this.getSubscriber(context);
            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Create the Subscriber").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created the Google Cloud PubSub Lite Subscriber").build());
        }
        catch (Exception e) {
            verificationLogger.error("Failed to create Google Cloud PubSub Lite Subscriber", (Throwable)e);
            verificationResults.add(new ConfigVerificationResult.Builder().verificationStepName("Create the Subscriber").outcome(ConfigVerificationResult.Outcome.FAILED).explanation("Failed to create Google Cloud PubSub Lite Subscriber: " + e.getLocalizedMessage()).build());
        }
        return verificationResults;
    }

    @Override
    protected GoogleCredentials getGoogleCredentials(ProcessContext context) {
        return super.getGoogleCredentials(context).createScoped(new String[]{"https://www.googleapis.com/auth/cloud-platform"});
    }

    private class Message {
        private PubsubMessage message;
        private AckReplyConsumer consumer;

        public Message(PubsubMessage message, AckReplyConsumer consumer) {
            this.message = message;
            this.consumer = consumer;
        }

        public PubsubMessage getMessage() {
            return this.message;
        }

        public AckReplyConsumer getConsumer() {
            return this.consumer;
        }
    }
}

