/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.sqs;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.sqs.AbstractSQSProcessor;
import org.apache.nifi.processors.aws.sqs.DeleteSQS;
import org.apache.nifi.processors.aws.sqs.GetSQS;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;

@SupportsBatching
@SeeAlso(value={GetSQS.class, DeleteSQS.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
@CapabilityDescription(value="Publishes a message to an Amazon Simple Queuing Service Queue")
@DynamicProperty(name="The name of a Message Attribute to add to the message", value="The value of the Message Attribute", description="Allows the user to add key/value pairs as Message Attributes by adding a property whose name will become the name of the Message Attribute and value will become the value of the Message Attribute", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
public class PutSQS
extends AbstractSQSProcessor {
    private static final String STRING_DATA_TYPE = "String";
    public static final PropertyDescriptor DELAY = new PropertyDescriptor.Builder().name("Delay").displayName("Delay").description("The amount of time to delay the message before it becomes available to consumers").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("0 secs").build();
    public static final PropertyDescriptor MESSAGEGROUPID = new PropertyDescriptor.Builder().name("message-group-id").displayName("Message Group ID").description("If using FIFO, the message group to which the FlowFile belongs").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor MESSAGEDEDUPLICATIONID = new PropertyDescriptor.Builder().name("deduplication-message-id").displayName("Deduplication Message ID").description("The token used for deduplication of sent messages").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, SSL_CONTEXT_SERVICE, REGION, DELAY, TIMEOUT, ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, MESSAGEGROUPID, MESSAGEDEDUPLICATIONID));
    private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList();

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(false).dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        this.userDefinedProperties = new ArrayList<PropertyDescriptor>();
        for (PropertyDescriptor descriptor : context.getProperties().keySet()) {
            if (!descriptor.isDynamic()) continue;
            this.userDefinedProperties.add(descriptor);
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        long startNanos = System.nanoTime();
        SqsClient client = (SqsClient)this.getClient(context);
        String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        session.exportTo(flowFile, (OutputStream)baos);
        String flowFileContent = baos.toString();
        HashMap<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>();
        for (PropertyDescriptor descriptor : this.userDefinedProperties) {
            MessageAttributeValue mav = (MessageAttributeValue)MessageAttributeValue.builder().dataType(STRING_DATA_TYPE).stringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue()).build();
            messageAttributes.put(descriptor.getName(), mav);
        }
        SendMessageBatchRequestEntry entry = (SendMessageBatchRequestEntry)SendMessageBatchRequestEntry.builder().messageBody(flowFileContent).messageGroupId(context.getProperty(MESSAGEGROUPID).evaluateAttributeExpressions(flowFile).getValue()).messageDeduplicationId(context.getProperty(MESSAGEDEDUPLICATIONID).evaluateAttributeExpressions(flowFile).getValue()).id(flowFile.getAttribute(CoreAttributes.UUID.key())).messageAttributes(messageAttributes).delaySeconds(Integer.valueOf(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue())).build();
        SendMessageBatchRequest request = (SendMessageBatchRequest)SendMessageBatchRequest.builder().queueUrl(queueUrl).entries(new SendMessageBatchRequestEntry[]{entry}).build();
        try {
            SendMessageBatchResponse response = client.sendMessageBatch(request);
            if (!response.failed().isEmpty()) {
                throw new ProcessException(((BatchResultErrorEntry)response.failed().get(0)).toString());
            }
        }
        catch (Exception e) {
            this.getLogger().error("Failed to send messages to Amazon SQS", (Throwable)e);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        this.getLogger().info("Successfully published message to Amazon SQS for {}", new Object[]{flowFile});
        session.transfer(flowFile, REL_SUCCESS);
        long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        session.getProvenanceReporter().send(flowFile, queueUrl, transmissionMillis);
    }
}

