/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.lambda;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.lambda.AWSLambdaClient;
import com.amazonaws.services.lambda.model.InvalidParameterValueException;
import com.amazonaws.services.lambda.model.InvalidRequestContentException;
import com.amazonaws.services.lambda.model.InvocationType;
import com.amazonaws.services.lambda.model.InvokeRequest;
import com.amazonaws.services.lambda.model.InvokeResult;
import com.amazonaws.services.lambda.model.LogType;
import com.amazonaws.services.lambda.model.RequestTooLargeException;
import com.amazonaws.services.lambda.model.ResourceNotFoundException;
import com.amazonaws.services.lambda.model.TooManyRequestsException;
import com.amazonaws.services.lambda.model.UnsupportedMediaTypeException;
import com.amazonaws.util.Base64;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.aws.lambda.AbstractAWSLambdaProcessor;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"amazon", "aws", "lambda", "put"})
@CapabilityDescription(value="Sends the contents to a specified Amazon Lambda Function. The AWS credentials used for authentication must have permissions execute the Lambda function (lambda:InvokeFunction).The FlowFile content must be JSON.")
@WritesAttributes(value={@WritesAttribute(attribute="aws.lambda.result.function.error", description="Function error message in result on posting message to AWS Lambda"), @WritesAttribute(attribute="aws.lambda.result.status.code", description="Status code in the result for the message when posting to AWS Lambda"), @WritesAttribute(attribute="aws.lambda.result.payload", description="Payload in the result from AWS Lambda"), @WritesAttribute(attribute="aws.lambda.result.log", description="Log in the result of the message posted to Lambda"), @WritesAttribute(attribute="aws.lambda.exception.message", description="Exception message on invoking from AWS Lambda"), @WritesAttribute(attribute="aws.lambda.exception.cause", description="Exception cause on invoking from AWS Lambda"), @WritesAttribute(attribute="aws.lambda.exception.error.code", description="Exception error code on invoking from AWS Lambda"), @WritesAttribute(attribute="aws.lambda.exception.request.id", description="Exception request id on invoking from AWS Lambda"), @WritesAttribute(attribute="aws.lambda.exception.status.code", description="Exception status code on invoking from AWS Lambda"), @WritesAttribute(attribute="aws.lambda.exception.error.type", description="Exception error type on invoking from AWS Lambda")})
public class PutLambda
extends AbstractAWSLambdaProcessor {
    public static final String AWS_LAMBDA_RESULT_FUNCTION_ERROR = "aws.lambda.result.function.error";
    public static final String AWS_LAMBDA_RESULT_STATUS_CODE = "aws.lambda.result.status.code";
    public static final String AWS_LAMBDA_RESULT_LOG = "aws.lambda.result.log";
    public static final String AWS_LAMBDA_RESULT_PAYLOAD = "aws.lambda.result.payload";
    public static final String AWS_LAMBDA_EXCEPTION_MESSAGE = "aws.lambda.exception.message";
    public static final String AWS_LAMBDA_EXCEPTION_CAUSE = "aws.lambda.exception.cause";
    public static final String AWS_LAMBDA_EXCEPTION_ERROR_CODE = "aws.lambda.exception.error.code";
    public static final String AWS_LAMBDA_EXCEPTION_REQUEST_ID = "aws.lambda.exception.request.id";
    public static final String AWS_LAMBDA_EXCEPTION_STATUS_CODE = "aws.lambda.exception.status.code";
    public static final String AWS_LAMBDA_EXCEPTION_ERROR_TYPE = "aws.lambda.exception.error.type";
    public static final long MAX_REQUEST_SIZE = 6000000L;
    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(AWS_LAMBDA_FUNCTION_NAME, AWS_LAMBDA_FUNCTION_QUALIFIER, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String functionName = context.getProperty(AWS_LAMBDA_FUNCTION_NAME).getValue();
        String qualifier = context.getProperty(AWS_LAMBDA_FUNCTION_QUALIFIER).getValue();
        if (flowFile.getSize() > 6000000L) {
            this.getLogger().error("Max size for request body is 6mb but was {} for flow file {} for function {}", new Object[]{flowFile.getSize(), flowFile, functionName});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        AWSLambdaClient client = (AWSLambdaClient)this.getClient();
        try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            session.exportTo(flowFile, (OutputStream)baos);
            InvokeRequest invokeRequest = new InvokeRequest().withFunctionName(functionName).withLogType(LogType.Tail).withInvocationType(InvocationType.RequestResponse).withPayload(ByteBuffer.wrap(baos.toByteArray())).withQualifier(qualifier);
            long startTime = System.nanoTime();
            InvokeResult result = client.invoke(invokeRequest);
            flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_STATUS_CODE, result.getStatusCode().toString());
            if (!StringUtils.isBlank((CharSequence)result.getLogResult())) {
                flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_LOG, new String(Base64.decode((String)result.getLogResult()), Charset.defaultCharset()));
            }
            if (result.getPayload() != null) {
                flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_PAYLOAD, new String(result.getPayload().array(), Charset.defaultCharset()));
            }
            if (!StringUtils.isBlank((CharSequence)result.getFunctionError())) {
                flowFile = session.putAttribute(flowFile, AWS_LAMBDA_RESULT_FUNCTION_ERROR, result.getFunctionError());
                session.transfer(flowFile, REL_FAILURE);
            } else {
                session.transfer(flowFile, REL_SUCCESS);
                long totalTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
                session.getProvenanceReporter().send(flowFile, functionName, totalTimeMillis);
            }
        }
        catch (InvalidParameterValueException | InvalidRequestContentException | RequestTooLargeException | ResourceNotFoundException | UnsupportedMediaTypeException unrecoverableException) {
            this.getLogger().error("Failed to invoke lambda {} with unrecoverable exception {} for flow file {}", new Object[]{functionName, unrecoverableException, flowFile});
            flowFile = this.populateExceptionAttributes(session, flowFile, (AmazonServiceException)unrecoverableException);
            session.transfer(flowFile, REL_FAILURE);
        }
        catch (TooManyRequestsException retryableServiceException) {
            this.getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}, therefore penalizing flowfile", new Object[]{functionName, retryableServiceException, flowFile});
            flowFile = this.populateExceptionAttributes(session, flowFile, (AmazonServiceException)((Object)retryableServiceException));
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
            context.yield();
        }
        catch (AmazonServiceException unrecoverableServiceException) {
            this.getLogger().error("Failed to invoke lambda {} with exception {} for flow file {} sending to fail", new Object[]{functionName, unrecoverableServiceException, flowFile});
            flowFile = this.populateExceptionAttributes(session, flowFile, unrecoverableServiceException);
            session.transfer(flowFile, REL_FAILURE);
            context.yield();
        }
        catch (Exception exception) {
            this.getLogger().error("Failed to invoke lambda {} with exception {} for flow file {}", new Object[]{functionName, exception, flowFile});
            session.transfer(flowFile, REL_FAILURE);
            context.yield();
        }
    }

    private FlowFile populateExceptionAttributes(ProcessSession session, FlowFile flowFile, AmazonServiceException exception) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
        attributes.put(AWS_LAMBDA_EXCEPTION_ERROR_CODE, exception.getErrorCode());
        attributes.put(AWS_LAMBDA_EXCEPTION_REQUEST_ID, exception.getRequestId());
        attributes.put(AWS_LAMBDA_EXCEPTION_STATUS_CODE, Integer.toString(exception.getStatusCode()));
        if (exception.getCause() != null) {
            attributes.put(AWS_LAMBDA_EXCEPTION_CAUSE, exception.getCause().getMessage());
        }
        attributes.put(AWS_LAMBDA_EXCEPTION_ERROR_TYPE, exception.getErrorType().toString());
        attributes.put(AWS_LAMBDA_EXCEPTION_MESSAGE, exception.getErrorMessage());
        flowFile = session.putAllAttributes(flowFile, attributes);
        return flowFile;
    }
}

