/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.aws.kinesis.firehose;

import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.Record;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.aws.kinesis.firehose.AbstractKinesisFirehoseProcessor;

@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"amazon", "aws", "firehose", "kinesis", "put", "stream"})
@CapabilityDescription(value="Sends the contents to a specified Amazon Kinesis Firehose. In order to send data to firehose, the firehose delivery stream name has to be specified.")
@WritesAttributes(value={@WritesAttribute(attribute="aws.kinesis.firehose.error.message", description="Error message on posting message to AWS Kinesis Firehose"), @WritesAttribute(attribute="aws.kinesis.firehose.error.code", description="Error code for the message when posting to AWS Kinesis Firehose"), @WritesAttribute(attribute="aws.kinesis.firehose.record.id", description="Record id of the message posted to Kinesis Firehose")})
public class PutKinesisFirehose
extends AbstractKinesisFirehoseProcessor {
    public static final String AWS_KINESIS_FIREHOSE_ERROR_MESSAGE = "aws.kinesis.firehose.error.message";
    public static final String AWS_KINESIS_FIREHOSE_ERROR_CODE = "aws.kinesis.firehose.error.code";
    public static final String AWS_KINESIS_FIREHOSE_RECORD_ID = "aws.kinesis.firehose.record.id";
    public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD, ENDPOINT_OVERRIDE));
    public static final int MAX_MESSAGE_SIZE = 1024000;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
        List flowFiles = this.filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
        HashMap hashFlowFiles = new HashMap();
        HashMap recordHash = new HashMap();
        AmazonKinesisFirehoseClient client = (AmazonKinesisFirehoseClient)this.getClient(context);
        try {
            ArrayList<FlowFile> failedFlowFiles = new ArrayList<FlowFile>();
            ArrayList<FlowFile> successfulFlowFiles = new ArrayList<FlowFile>();
            for (int i = 0; i < flowFiles.size(); ++i) {
                FlowFile flowFile = (FlowFile)flowFiles.get(i);
                String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                session.exportTo(flowFile, (OutputStream)baos);
                if (!recordHash.containsKey(firehoseStreamName)) {
                    recordHash.put(firehoseStreamName, new ArrayList());
                }
                if (!hashFlowFiles.containsKey(firehoseStreamName)) {
                    hashFlowFiles.put(firehoseStreamName, new ArrayList());
                }
                ((List)hashFlowFiles.get(firehoseStreamName)).add(flowFile);
                ((List)recordHash.get(firehoseStreamName)).add(new Record().withData(ByteBuffer.wrap(baos.toByteArray())));
            }
            for (Map.Entry entryRecord : recordHash.entrySet()) {
                String streamName = (String)entryRecord.getKey();
                List records = (List)entryRecord.getValue();
                if (records.size() <= 0) continue;
                PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
                putRecordBatchRequest.setDeliveryStreamName(streamName);
                putRecordBatchRequest.setRecords((Collection)records);
                PutRecordBatchResult results = client.putRecordBatch(putRecordBatchRequest);
                List responseEntries = results.getRequestResponses();
                for (int i = 0; i < responseEntries.size(); ++i) {
                    PutRecordBatchResponseEntry entry = (PutRecordBatchResponseEntry)responseEntries.get(i);
                    FlowFile flowFile = (FlowFile)((List)hashFlowFiles.get(streamName)).get(i);
                    HashMap<String, String> attributes = new HashMap<String, String>();
                    attributes.put(AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId());
                    flowFile = session.putAttribute(flowFile, AWS_KINESIS_FIREHOSE_RECORD_ID, entry.getRecordId());
                    if (!StringUtils.isBlank((CharSequence)entry.getErrorCode())) {
                        attributes.put(AWS_KINESIS_FIREHOSE_ERROR_CODE, entry.getErrorCode());
                        attributes.put(AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, entry.getErrorMessage());
                        flowFile = session.putAllAttributes(flowFile, attributes);
                        failedFlowFiles.add(flowFile);
                        continue;
                    }
                    flowFile = session.putAllAttributes(flowFile, attributes);
                    successfulFlowFiles.add(flowFile);
                }
                ((List)recordHash.get(streamName)).clear();
                records.clear();
            }
            if (failedFlowFiles.size() > 0) {
                session.transfer(failedFlowFiles, REL_FAILURE);
                this.getLogger().error("Failed to publish to kinesis firehose {}", new Object[]{failedFlowFiles});
            }
            if (successfulFlowFiles.size() > 0) {
                session.transfer(successfulFlowFiles, REL_SUCCESS);
                this.getLogger().info("Successfully published to kinesis firehose {}", new Object[]{successfulFlowFiles});
            }
        }
        catch (Exception exception) {
            this.getLogger().error("Failed to publish to kinesis firehose {}", new Object[]{flowFiles, exception});
            session.transfer((Collection)flowFiles, REL_FAILURE);
            context.yield();
        }
    }
}

