/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.azure.cosmos.document;

import com.azure.cosmos.CosmosContainer;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ConflictException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.cosmos.document.AbstractAzureCosmosDBProcessor;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;

@EventDriven
@Tags(value={"azure", "cosmos", "insert", "record", "put"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="This processor is a record-aware processor for inserting data into Cosmos DB with Core SQL API. It uses a configured record reader and schema to read an incoming record set from the body of a Flowfile and then inserts those records into a configured Cosmos DB Container.")
@SystemResourceConsideration(resource=SystemResource.MEMORY)
public class PutAzureCosmosDBRecord
extends AbstractAzureCosmosDBProcessor {
    private String conflictHandlingStrategy;
    static final AllowableValue IGNORE_CONFLICT = new AllowableValue("IGNORE", "Ignore", "Conflicting records will not be inserted, and FlowFile will not be routed to failure");
    static final AllowableValue UPSERT_CONFLICT = new AllowableValue("UPSERT", "Upsert", "Conflicting records will be upserted, and FlowFile will not be routed to failure");
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor INSERT_BATCH_SIZE = new PropertyDescriptor.Builder().name("insert-batch-size").displayName("Insert Batch Size").description("The number of records to group together for one single insert operation against Cosmos DB").defaultValue("20").required(false).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    static final PropertyDescriptor CONFLICT_HANDLE_STRATEGY = new PropertyDescriptor.Builder().name("azure-cosmos-db-conflict-handling-strategy").displayName("Cosmos DB Conflict Handling Strategy").description("Choose whether to ignore or upsert when conflict error occurs during insertion").required(false).allowableValues(new AllowableValue[]{IGNORE_CONFLICT, UPSERT_CONFLICT}).defaultValue(IGNORE_CONFLICT.getValue()).addValidator(StandardValidators.NON_BLANK_VALIDATOR).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    protected void bulkInsert(List<Map<String, Object>> records) throws CosmosException {
        ComponentLog logger = this.getLogger();
        CosmosContainer container = this.getContainer();
        for (Map<String, Object> record : records) {
            try {
                container.createItem(record);
            }
            catch (ConflictException e) {
                if (this.conflictHandlingStrategy != null && this.conflictHandlingStrategy.equals(UPSERT_CONFLICT.getValue())) {
                    container.upsertItem(record);
                    continue;
                }
                if (!logger.isDebugEnabled()) continue;
                logger.debug("Ignoring duplicate based on selected conflict resolution strategy");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        ComponentLog logger = this.getLogger();
        RecordReaderFactory recordParserFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        String partitionKeyField = context.getProperty(PARTITION_KEY).getValue();
        ArrayList<Map<String, Object>> batch = new ArrayList<Map<String, Object>>();
        int ceiling = context.getProperty(INSERT_BATCH_SIZE).asInteger();
        boolean error = false;
        try (InputStream inStream = session.read(flowFile);
             RecordReader reader = recordParserFactory.createRecordReader(flowFile, inStream, this.getLogger());){
            Record record;
            RecordSchema schema = reader.getSchema();
            while ((record = reader.nextRecord()) != null) {
                Map contentMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)record, (DataType)RecordFieldType.RECORD.getRecordDataType(schema));
                if (contentMap.containsKey("id")) {
                    String idStr;
                    Object idObj = contentMap.get("id");
                    String string = idStr = idObj == null ? "" : String.valueOf(idObj);
                    if (idObj == null || StringUtils.isBlank((CharSequence)idStr)) {
                        contentMap.put("id", UUID.randomUUID().toString());
                    } else {
                        contentMap.put("id", idStr);
                    }
                } else {
                    contentMap.put("id", UUID.randomUUID().toString());
                }
                if (!contentMap.containsKey(partitionKeyField)) {
                    logger.error(String.format("PutAzureCosmoDBRecord failed with missing partitionKeyField (%s)", partitionKeyField));
                    error = true;
                    break;
                }
                batch.add(contentMap);
                if (batch.size() != ceiling) continue;
                this.bulkInsert(batch);
                batch = new ArrayList();
            }
            if (!error && batch.size() > 0) {
                this.bulkInsert(batch);
            }
        }
        catch (CosmosException | IOException | SchemaNotFoundException | MalformedRecordException e) {
            logger.error("PutAzureCosmoDBRecord failed with error: {}", new Object[]{e.getMessage()}, e);
            error = true;
        }
        finally {
            if (!error) {
                session.getProvenanceReporter().send(flowFile, this.getURI(context));
                session.transfer(flowFile, REL_SUCCESS);
            } else {
                session.transfer(flowFile, REL_FAILURE);
            }
        }
    }

    @Override
    protected void doPostActionOnSchedule(ProcessContext context) {
        this.conflictHandlingStrategy = context.getProperty(CONFLICT_HANDLE_STRATEGY).getValue();
        if (this.conflictHandlingStrategy == null) {
            this.conflictHandlingStrategy = IGNORE_CONFLICT.getValue();
        }
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.addAll(descriptors);
        _propertyDescriptors.add(RECORD_READER_FACTORY);
        _propertyDescriptors.add(INSERT_BATCH_SIZE);
        _propertyDescriptors.add(CONFLICT_HANDLE_STRATEGY);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

