/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.pubsub;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.iam.v1.TestIamPermissionsRequest;
import com.google.iam.v1.TestIamPermissionsResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubWithProxyProcessor;
import org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub;
import org.apache.nifi.processors.gcp.pubsub.publish.FlowFileResult;
import org.apache.nifi.processors.gcp.pubsub.publish.MessageDerivationStrategy;
import org.apache.nifi.processors.gcp.pubsub.publish.TrackedApiFutureCallback;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
import org.threeten.bp.Duration;

@SeeAlso(value={ConsumeGCPubSub.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"google", "google-cloud", "gcp", "message", "pubsub", "publish"})
@CapabilityDescription(value="Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties. If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
@DynamicProperty(name="Attribute name", value="Value to be set to the attribute", description="Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttributes(value={@WritesAttribute(attribute="gcp.pubsub.messageId", description="ID of the pubsub message published to the configured Google Cloud PubSub topic"), @WritesAttribute(attribute="gcp.pubsub.count.records", description="Count of pubsub messages published to the configured Google Cloud PubSub topic"), @WritesAttribute(attribute="gcp.pubsub.topic", description="Name of the Google Cloud PubSub topic the message was published to")})
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="The entirety of the FlowFile's content will be read into memory to be sent as a PubSub message.")
public class PublishGCPubSub
extends AbstractGCPubSubWithProxyProcessor {
    private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish");
    private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
    public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder().name("Input Batch Size").displayName("Input Batch Size").description("Maximum number of FlowFiles processed for each Processor invocation").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).addValidator(StandardValidators.NUMBER_VALIDATOR).defaultValue("100").build();
    public static final PropertyDescriptor MESSAGE_DERIVATION_STRATEGY = new PropertyDescriptor.Builder().name("Message Derivation Strategy").displayName("Message Derivation Strategy").description("The strategy used to publish the incoming FlowFile to the Google Cloud PubSub endpoint.").required(true).defaultValue(MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue()).allowableValues(MessageDerivationStrategy.class).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").displayName("Record Reader").description("The Record Reader to use for incoming FlowFiles").identifiesControllerService(RecordReaderFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue(), new String[0]).required(true).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").displayName("Record Writer").description("The Record Writer to use in order to serialize the data before sending to GCPubSub endpoint").identifiesControllerService(RecordSetWriterFactory.class).expressionLanguageSupported(ExpressionLanguageScope.NONE).dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.RECORD_ORIENTED.getValue(), new String[0]).required(true).build();
    public static final PropertyDescriptor MAX_MESSAGE_SIZE = new PropertyDescriptor.Builder().name("Maximum Message Size").displayName("Maximum Message Size").description("The maximum size of a Google PubSub message in bytes. Defaults to 1 MB (1048576 bytes)").dependsOn(MESSAGE_DERIVATION_STRATEGY, MessageDerivationStrategy.FLOWFILE_ORIENTED.getValue(), new String[0]).required(true).addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").build();
    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder().name("gcp-pubsub-topic").displayName("Topic Name").description("Name of the Google Cloud PubSub Topic").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.").build();
    protected Publisher publisher = null;

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        descriptors.add(MAX_BATCH_SIZE);
        descriptors.add(MESSAGE_DERIVATION_STRATEGY);
        descriptors.add(RECORD_READER);
        descriptors.add(RECORD_WRITER);
        descriptors.add(MAX_MESSAGE_SIZE);
        descriptors.add(TOPIC_NAME);
        descriptors.add(BATCH_SIZE_THRESHOLD);
        descriptors.add(BATCH_BYTES_THRESHOLD);
        descriptors.add(BATCH_DELAY_THRESHOLD);
        descriptors.add(API_ENDPOINT);
        return Collections.unmodifiableList(descriptors);
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().required(false).name(propertyDescriptorName).displayName(propertyDescriptorName).addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR).dynamic(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    }

    @Override
    public Set<Relationship> getRelationships() {
        return Collections.unmodifiableSet(new HashSet<Relationship>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY)));
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        try {
            this.publisher = this.getPublisherBuilder(context).build();
        }
        catch (IOException e) {
            throw new ProcessException("Failed to create Google Cloud PubSub Publisher", (Throwable)e);
        }
    }

    public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) {
        ArrayList<ConfigVerificationResult> results = new ArrayList<ConfigVerificationResult>();
        Publisher publisher = null;
        try {
            publisher = this.getPublisherBuilder(context).build();
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Create Publisher").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation("Successfully created Publisher").build());
        }
        catch (IOException e) {
            verificationLogger.error("Failed to create Publisher", (Throwable)e);
            results.add(new ConfigVerificationResult.Builder().verificationStepName("Create Publisher").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("Failed to create Publisher: " + e.getMessage(), new Object[0])).build());
        }
        if (publisher != null) {
            try {
                PublisherStubSettings publisherStubSettings = ((PublisherStubSettings.Builder)((PublisherStubSettings.Builder)PublisherStubSettings.newBuilder().setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context)))).setTransportChannelProvider(this.getTransportChannelProvider(context))).build();
                try (GrpcPublisherStub publisherStub = GrpcPublisherStub.create((PublisherStubSettings)publisherStubSettings);){
                    String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
                    TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder().addAllPermissions(REQUIRED_PERMISSIONS).setResource(topicName).build();
                    TestIamPermissionsResponse response = (TestIamPermissionsResponse)publisherStub.testIamPermissionsCallable().call((Object)request);
                    if (response.getPermissionsCount() >= REQUIRED_PERMISSIONS.size()) {
                        results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(String.format("Verified Topic [%s] exists and the configured user has the correct permissions.", topicName)).build());
                    } else {
                        results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user does not have the correct permissions on Topic [%s].", topicName)).build());
                    }
                }
            }
            catch (ApiException e) {
                verificationLogger.error("The configured user appears to have the correct permissions, but the following error was encountered", (Throwable)e);
                results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The configured user appears to have the correct permissions, but the following error was encountered: " + e.getMessage(), new Object[0])).build());
            }
            catch (IOException e) {
                verificationLogger.error("The publisher stub could not be created in order to test the permissions", (Throwable)e);
                results.add(new ConfigVerificationResult.Builder().verificationStepName("Test IAM Permissions").outcome(ConfigVerificationResult.Outcome.FAILED).explanation(String.format("The publisher stub could not be created in order to test the permissions: " + e.getMessage(), new Object[0])).build());
            }
        }
        return results;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        StopWatch stopWatch = new StopWatch(true);
        MessageDerivationStrategy inputStrategy = MessageDerivationStrategy.valueOf(context.getProperty(MESSAGE_DERIVATION_STRATEGY).getValue());
        int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
        List flowFileBatch = session.get(maxBatchSize);
        if (flowFileBatch.isEmpty()) {
            context.yield();
        } else if (MessageDerivationStrategy.FLOWFILE_ORIENTED.equals((Object)inputStrategy)) {
            this.onTriggerFlowFileStrategy(context, session, stopWatch, flowFileBatch);
        } else if (MessageDerivationStrategy.RECORD_ORIENTED.equals((Object)inputStrategy)) {
            this.onTriggerRecordStrategy(context, session, stopWatch, flowFileBatch);
        } else {
            throw new IllegalStateException(inputStrategy.getValue());
        }
    }

    private void onTriggerFlowFileStrategy(ProcessContext context, ProcessSession session, StopWatch stopWatch, List<FlowFile> flowFileBatch) throws ProcessException {
        long maxMessageSize = context.getProperty(MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).longValue();
        Executor executor = MoreExecutors.directExecutor();
        ArrayList<FlowFileResult> flowFileResults = new ArrayList<FlowFileResult>();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        for (FlowFile flowFile : flowFileBatch) {
            ArrayList<ApiFuture<String>> futures = new ArrayList<ApiFuture<String>>();
            ArrayList<String> successes = new ArrayList<String>();
            ArrayList<Throwable> failures = new ArrayList<Throwable>();
            if (flowFile.getSize() > maxMessageSize) {
                String message = String.format("FlowFile size %d exceeds MAX_MESSAGE_SIZE", flowFile.getSize());
                failures.add(new IllegalArgumentException(message));
                flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
                continue;
            }
            baos.reset();
            session.exportTo(flowFile, (OutputStream)baos);
            ApiFuture<String> apiFuture = this.publishOneMessage(context, flowFile, baos.toByteArray());
            futures.add(apiFuture);
            this.addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor);
            flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
        }
        this.finishBatch(session, stopWatch, flowFileResults);
    }

    private void onTriggerRecordStrategy(ProcessContext context, ProcessSession session, StopWatch stopWatch, List<FlowFile> flowFileBatch) throws ProcessException {
        try {
            this.onTriggerRecordStrategyPublishRecords(context, session, stopWatch, flowFileBatch);
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            throw new ProcessException("Record publishing failed", e);
        }
    }

    private void onTriggerRecordStrategyPublishRecords(ProcessContext context, ProcessSession session, StopWatch stopWatch, List<FlowFile> flowFileBatch) throws ProcessException, IOException, SchemaNotFoundException, MalformedRecordException {
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        Executor executor = MoreExecutors.directExecutor();
        ArrayList<FlowFileResult> flowFileResults = new ArrayList<FlowFileResult>();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        for (FlowFile flowFile : flowFileBatch) {
            ArrayList<ApiFuture<String>> futures = new ArrayList<ApiFuture<String>>();
            ArrayList<String> successes = new ArrayList<String>();
            ArrayList<Throwable> failures = new ArrayList<Throwable>();
            Map attributes = flowFile.getAttributes();
            RecordReader reader = readerFactory.createRecordReader(attributes, session.read(flowFile), flowFile.getSize(), this.getLogger());
            try {
                RecordSet recordSet = reader.createRecordSet();
                RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());
                RecordSetWriter writer = writerFactory.createWriter(this.getLogger(), schema, (OutputStream)baos, attributes);
                PushBackRecordSet pushBackRecordSet = new PushBackRecordSet(recordSet);
                while (pushBackRecordSet.isAnotherRecord()) {
                    ApiFuture<String> apiFuture = this.publishOneRecord(context, flowFile, baos, writer, pushBackRecordSet.next());
                    futures.add(apiFuture);
                    this.addCallback(apiFuture, new TrackedApiFutureCallback(successes, failures), executor);
                }
                flowFileResults.add(new FlowFileResult(flowFile, futures, successes, failures));
            }
            finally {
                if (reader == null) continue;
                reader.close();
            }
        }
        this.finishBatch(session, stopWatch, flowFileResults);
    }

    private ApiFuture<String> publishOneRecord(ProcessContext context, FlowFile flowFile, ByteArrayOutputStream baos, RecordSetWriter writer, Record record) throws IOException {
        baos.reset();
        writer.write(record);
        writer.flush();
        return this.publishOneMessage(context, flowFile, baos.toByteArray());
    }

    private ApiFuture<String> publishOneMessage(ProcessContext context, FlowFile flowFile, byte[] content) {
        PubsubMessage message = PubsubMessage.newBuilder().setData(ByteString.copyFrom((byte[])content)).setPublishTime(Timestamp.newBuilder().build()).putAllAttributes(this.getDynamicAttributesMap(context, flowFile)).build();
        return this.publisher.publish(message);
    }

    private void finishBatch(ProcessSession session, StopWatch stopWatch, List<FlowFileResult> flowFileResults) {
        String topicName = this.publisher.getTopicNameString();
        for (FlowFileResult flowFileResult : flowFileResults) {
            Relationship relationship = flowFileResult.reconcile();
            Map<String, String> attributes = flowFileResult.getAttributes();
            attributes.put("gcp.pubsub.topic", topicName);
            FlowFile flowFile = session.putAllAttributes(flowFileResult.getFlowFile(), attributes);
            String transitUri = String.format(TRANSIT_URI_FORMAT_STRING, topicName);
            session.getProvenanceReporter().send(flowFile, transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, relationship);
        }
    }

    protected void addCallback(ApiFuture<String> apiFuture, ApiFutureCallback<? super String> callback, Executor executor) {
        ApiFutures.addCallback(apiFuture, callback, (Executor)executor);
    }

    @OnStopped
    public void onStopped() {
        this.shutdownPublisher();
    }

    private void shutdownPublisher() {
        try {
            if (this.publisher != null) {
                this.publisher.shutdown();
            }
        }
        catch (Exception e) {
            this.getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Publisher", (Throwable)e);
        }
    }

    private ProjectTopicName getTopicName(ProcessContext context) {
        String topic = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
        String projectId = context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
        if (topic.contains("/")) {
            return ProjectTopicName.parse((String)topic);
        }
        return ProjectTopicName.of((String)projectId, (String)topic);
    }

    private Map<String, String> getDynamicAttributesMap(ProcessContext context, FlowFile flowFile) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        for (Map.Entry entry : context.getProperties().entrySet()) {
            if (!((PropertyDescriptor)entry.getKey()).isDynamic()) continue;
            String value = context.getProperty((PropertyDescriptor)entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
            attributes.put(((PropertyDescriptor)entry.getKey()).getName(), value);
        }
        return attributes;
    }

    private Publisher.Builder getPublisherBuilder(ProcessContext context) {
        Long batchSizeThreshold = context.getProperty(BATCH_SIZE_THRESHOLD).asLong();
        long batchBytesThreshold = context.getProperty(BATCH_BYTES_THRESHOLD).asDataSize(DataUnit.B).longValue();
        Long batchDelayThreshold = context.getProperty(BATCH_DELAY_THRESHOLD).asTimePeriod(TimeUnit.MILLISECONDS);
        String endpoint = context.getProperty(API_ENDPOINT).getValue();
        Publisher.Builder publisherBuilder = Publisher.newBuilder((TopicName)this.getTopicName(context)).setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)this.getGoogleCredentials(context))).setChannelProvider(this.getTransportChannelProvider(context)).setEndpoint(endpoint);
        publisherBuilder.setBatchingSettings(BatchingSettings.newBuilder().setElementCountThreshold(batchSizeThreshold).setRequestByteThreshold(Long.valueOf(batchBytesThreshold)).setDelayThreshold(Duration.ofMillis((long)batchDelayThreshold)).setIsEnabled(Boolean.valueOf(true)).build());
        return publisherBuilder;
    }
}

