/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.pathtemplate.ValidationException;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubWithProxyProcessor;
import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;

@SeeAlso(value={PublishGCPubSub.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"google", "google-cloud", "gcp", "message", "pubsub", "consume"})
@CapabilityDescription(value="Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, the configured number of messages will be pulled in a single request, else only one message will be pulled.")
@WritesAttributes(value={@WritesAttribute(attribute="gcp.pubsub.ackId", description="Acknowledgement Id of the consumed Google Cloud PubSub message"), @WritesAttribute(attribute="gcp.pubsub.messageSize", description="Serialized size of the consumed Google Cloud PubSub message"), @WritesAttribute(attribute="gcp.pubsub.attributesCount", description="Number of attributes the consumed PubSub message has, if any"), @WritesAttribute(attribute="gcp.pubsub.publishTime", description="Timestamp value when the message was published"), @WritesAttribute(attribute="Dynamic Attributes", description="Other than the listed attributes, this processor may write zero or more attributes, if the original Google Cloud Publisher client added any attributes to the message while sending")})
public class ConsumeGCPubSub
extends AbstractGCPubSubWithProxyProcessor {
    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.subscriptions.consume");
    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder().name("gcp-pubsub-subscription").displayName("Subscription").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).description("Name of the Google Cloud Pub/Sub Subscription").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    private SubscriberStub subscriber = null;
    private PullRequest pullRequest;
    private final AtomicReference<Exception> storedException = new AtomicReference();

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        Integer batchSize = context.getProperty(BATCH_SIZE_THRESHOLD).asInteger();
        this.pullRequest = PullRequest.newBuilder().setMaxMessages(batchSize.intValue()).setSubscription(this.getSubscriptionName(context, null)).build();
        try {
            this.subscriber = this.getSubscriber(context);
        }
        catch (IOException e) {
            this.storedException.set(e);
            this.getLogger().error("Failed to create Google Cloud Subscriber", (Throwable)e);
        }
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>();
        String subscriptionName = null;
        try {
            subscriptionName = this.getSubscriptionName(context, attributes);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Subscription Name").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully parsed Subscription Name").build());
        }
        catch (ValidationException e) {
            verificationLogger.error("Failed to parse Subscription Name", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Parse Subscription Name").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to parse Subscription Name: " + e.getMessage(), new Object[0])).build());
        }
        SubscriberStub subscriber = null;
        try {
            subscriber = this.getSubscriber(context);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Create Subscriber").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created Subscriber").build());
        }
        catch (IOException e) {
            verificationLogger.error("Failed to create Subscriber", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Create Subscriber").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to create Subscriber: " + e.getMessage(), new Object[0])).build());
        }
        if (subscriber != null && subscriptionName != null) {
            try {
                TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(subscriptionName).build();
                if (((TestIamPermissionsResponse)subscriber.testIamPermissionsCallable().call((Object)request)).getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
                    results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Verified Subscription [%s] exists and the configured user has the correct permissions.", subscriptionName)).build());
                } else {
                    results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user does not have the correct permissions on Subscription [%s].", subscriptionName)).build());
                }
            }
            catch (ApiException e) {
                verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", (Throwable)e);
                results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage(), new Object[0])).build());
            }
        }
        return results;
    }

    @OnStopped
    public void onStopped() {
        if (this.subscriber != null) {
            this.subscriber.shutdown();
        }
    }

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        descriptors.add(SUBSCRIPTION);
        descriptors.add(BATCH_SIZE_THRESHOLD);
        descriptors.add(API_ENDPOINT);
        return Collections.unmodifiableList(descriptors);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return Collections.singleton(REL_SUCCESS);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.subscriber == null) {
            if (this.storedException.get() != null) {
                this.getLogger().error("Failed to create Google Cloud PubSub subscriber due to {}", (Throwable)this.storedException.get());
            } else {
                this.getLogger().error("Google Cloud PubSub Subscriber was not properly created. Yielding the processor...");
            }
            context.yield();
            return;
        }
        PullResponse pullResponse = (PullResponse)this.subscriber.pullCallable().call((Object)this.pullRequest);
        ArrayList<String> ackIds = new ArrayList<String>();
        String subscriptionName = this.getSubscriptionName(context, null);
        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
            if (!message.hasMessage()) continue;
            FlowFile flowFile = session.create();
            HashMap<String, String> attributes = new HashMap<String, String>();
            ackIds.add(message.getAckId());
            attributes.put("gcp.pubsub.ackId", message.getAckId());
            attributes.put("gcp.pubsub.messageSize", String.valueOf(message.getSerializedSize()));
            attributes.put("gcp.pubsub.messageId", message.getMessage().getMessageId());
            attributes.put("gcp.pubsub.attributesCount", String.valueOf(message.getMessage().getAttributesCount()));
            attributes.put("gcp.pubsub.publishTime", String.valueOf(message.getMessage().getPublishTime().getSeconds()));
            attributes.putAll(message.getMessage().getAttributesMap());
            flowFile = session.putAllAttributes(flowFile, attributes);
            flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toByteArray()));
            session.transfer(flowFile, REL_SUCCESS);
            session.getProvenanceReporter().receive(flowFile, subscriptionName);
        }
        session.commitAsync(() -> this.acknowledgeAcks(ackIds, subscriptionName));
    }

    private void acknowledgeAcks(Collection<String> ackIds, String subscriptionName) {
        if (ackIds == null || ackIds.isEmpty()) {
            return;
        }
        AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder().addAllAckIds(ackIds).setSubscription(subscriptionName).build();
        this.subscriber.acknowledgeCallable().call((Object)acknowledgeRequest);
    }

    private String getSubscriptionName(ProcessContext context, Map<String, String> additionalAttributes) {
        String subscriptionName = context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions(additionalAttributes).getValue();
        String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions(additionalAttributes).getValue();
        if (subscriptionName.contains("/")) {
            return ProjectSubscriptionName.parse((String)subscriptionName).toString();
        }
        return ProjectSubscriptionName.of((String)projectId, (String)subscriptionName).toString();
    }

    private SubscriberStub getSubscriber(ProcessContext context) throws IOException {
        String endpoint = context.getProperty(API_ENDPOINT).getValue();
        SubscriberStubSettings.Builder subscriberBuilder = (SubscriberStubSettings.Builder)((SubscriberStubSettings.Builder)((SubscriberStubSettings.Builder)SubscriberStubSettings.newBuilder().setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context)))).setTransportChannelProvider(this.getTransportChannelProvider(context))).setEndpoint(endpoint);
        return GrpcSubscriberStub.create((SubscriberStubSettings)subscriberBuilder.build());
    }
}

