/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.cassandra;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.querybuilder.Assignment;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Update;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.cassandra.AbstractCassandraProcessor;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;

@Tags(value={"cassandra", "cql", "put", "insert", "update", "set", "record"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="This is a record aware processor that reads the content of the incoming FlowFile as individual records using the configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.")
@ReadsAttributes(value={@ReadsAttribute(attribute="cql.statement.type", description="If 'Use cql.statement.type Attribute' is selected for the Statement Type property, the value of the cql.statement.type Attribute will be used to determine which type of statement (UPDATE, INSERT) will be generated and executed"), @ReadsAttribute(attribute="cql.update.method", description="If 'Use cql.update.method Attribute' is selected for the Update Method property, the value of the cql.update.method Attribute will be used to determine which operation (Set, Increment, Decrement) will be used to generate and execute the Update statement. Ignored if the Statement Type property is not set to UPDATE"), @ReadsAttribute(attribute="cql.batch.statement.type", description="If 'Use cql.batch.statement.type Attribute' is selected for the Batch Statement Type property, the value of the cql.batch.statement.type Attribute will be used to determine which type of batch statement (LOGGED, UNLOGGED, COUNTER) will be generated and executed")})
public class PutCassandraRecord
extends AbstractCassandraProcessor {
    static final AllowableValue UPDATE_TYPE = new AllowableValue("UPDATE", "UPDATE", "Use an UPDATE statement.");
    static final AllowableValue INSERT_TYPE = new AllowableValue("INSERT", "INSERT", "Use an INSERT statement.");
    static final AllowableValue STATEMENT_TYPE_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.statement.type Attribute", "The value of the cql.statement.type Attribute will be used to determine which type of statement (UPDATE, INSERT) will be generated and executed");
    static final String STATEMENT_TYPE_ATTRIBUTE = "cql.statement.type";
    static final AllowableValue INCR_TYPE = new AllowableValue("INCREMENT", "Increment", "Use an increment operation (+=) for the Update statement.");
    static final AllowableValue SET_TYPE = new AllowableValue("SET", "Set", "Use a set operation (=) for the Update statement.");
    static final AllowableValue DECR_TYPE = new AllowableValue("DECREMENT", "Decrement", "Use a decrement operation (-=) for the Update statement.");
    static final AllowableValue UPDATE_METHOD_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.update.method Attribute", "The value of the cql.update.method Attribute will be used to determine which operation (Set, Increment, Decrement) will be used to generate and execute the Update statement.");
    static final String UPDATE_METHOD_ATTRIBUTE = "cql.update.method";
    static final AllowableValue LOGGED_TYPE = new AllowableValue("LOGGED", "LOGGED", "Use a LOGGED batch statement");
    static final AllowableValue UNLOGGED_TYPE = new AllowableValue("UNLOGGED", "UNLOGGED", "Use an UNLOGGED batch statement");
    static final AllowableValue COUNTER_TYPE = new AllowableValue("COUNTER", "COUNTER", "Use a COUNTER batch statement");
    static final AllowableValue BATCH_STATEMENT_TYPE_USE_ATTR_TYPE = new AllowableValue("USE_ATTR", "Use cql.batch.statement.type Attribute", "The value of the cql.batch.statement.type Attribute will be used to determine which type of batch statement (LOGGED, UNLOGGED or COUNTER) will be used to generate and execute the Update statement.");
    static final String BATCH_STATEMENT_TYPE_ATTRIBUTE = "cql.batch.statement.type";
    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder().name("put-cassandra-record-reader").displayName("Record Reader").description("Specifies the type of Record Reader controller service to use for parsing the incoming data and determining the schema").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor STATEMENT_TYPE = new PropertyDescriptor.Builder().name("put-cassandra-record-statement-type").displayName("Statement Type").description("Specifies the type of CQL Statement to generate.").required(true).defaultValue(INSERT_TYPE.getValue()).allowableValues(new AllowableValue[]{UPDATE_TYPE, INSERT_TYPE, STATEMENT_TYPE_USE_ATTR_TYPE}).build();
    static final PropertyDescriptor UPDATE_METHOD = new PropertyDescriptor.Builder().name("put-cassandra-record-update-method").displayName("Update Method").description("Specifies the method to use to SET the values. This property is used if the Statement Type is UPDATE and ignored otherwise.").required(false).defaultValue(SET_TYPE.getValue()).allowableValues(new AllowableValue[]{INCR_TYPE, DECR_TYPE, SET_TYPE, UPDATE_METHOD_USE_ATTR_TYPE}).build();
    static final PropertyDescriptor UPDATE_KEYS = new PropertyDescriptor.Builder().name("put-cassandra-record-update-keys").displayName("Update Keys").description("A comma-separated list of column names that uniquely identifies a row in the database for UPDATE statements. If the Statement Type is UPDATE and this property is not set, the conversion to CQL will fail. This property is ignored if the Statement Type is not UPDATE.").addValidator(StandardValidators.createListValidator((boolean)true, (boolean)false, (Validator)StandardValidators.NON_EMPTY_VALIDATOR)).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder().name("put-cassandra-record-table").displayName("Table name").description("The name of the Cassandra table to which the records have to be written.").required(true).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("put-cassandra-record-batch-size").displayName("Batch size").description("Specifies the number of 'Insert statements' to be grouped together to execute as a batch (BatchStatement)").defaultValue("100").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).required(true).build();
    static final PropertyDescriptor BATCH_STATEMENT_TYPE = new PropertyDescriptor.Builder().name("put-cassandra-record-batch-statement-type").displayName("Batch Statement Type").description("Specifies the type of 'Batch Statement' to be used.").allowableValues(new AllowableValue[]{LOGGED_TYPE, UNLOGGED_TYPE, COUNTER_TYPE, BATCH_STATEMENT_TYPE_USE_ATTR_TYPE}).defaultValue(LOGGED_TYPE.getValue()).required(false).build();
    static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder().fromPropertyDescriptor(AbstractCassandraProcessor.CONSISTENCY_LEVEL).allowableValues(new String[]{ConsistencyLevel.SERIAL.name(), ConsistencyLevel.LOCAL_SERIAL.name()}).defaultValue(ConsistencyLevel.SERIAL.name()).build();
    private static final List<PropertyDescriptor> propertyDescriptors = Collections.unmodifiableList(Arrays.asList(CONNECTION_PROVIDER_SERVICE, CONTACT_POINTS, KEYSPACE, TABLE, STATEMENT_TYPE, UPDATE_KEYS, UPDATE_METHOD, CLIENT_AUTH, USERNAME, PASSWORD, RECORD_READER_FACTORY, BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE));
    private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<Relationship>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String batchStatementTypeProperty;
        String updateMethodProperty;
        String statementTypeProperty;
        FlowFile inputFlowFile = session.get();
        if (inputFlowFile == null) {
            return;
        }
        String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
        RecordReaderFactory recordParserFactory = (RecordReaderFactory)context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
        int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
        String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
        String updateKeys = context.getProperty(UPDATE_KEYS).evaluateAttributeExpressions(inputFlowFile).getValue();
        String statementType = statementTypeProperty = context.getProperty(STATEMENT_TYPE).getValue();
        if (STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(statementTypeProperty)) {
            statementType = inputFlowFile.getAttribute(STATEMENT_TYPE_ATTRIBUTE);
        }
        String updateMethod = updateMethodProperty = context.getProperty(UPDATE_METHOD).getValue();
        if (UPDATE_METHOD_USE_ATTR_TYPE.getValue().equals(updateMethodProperty)) {
            updateMethod = inputFlowFile.getAttribute(UPDATE_METHOD_ATTRIBUTE);
        }
        String batchStatementType = batchStatementTypeProperty = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
        if (BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equals(batchStatementTypeProperty)) {
            batchStatementType = inputFlowFile.getAttribute(BATCH_STATEMENT_TYPE_ATTRIBUTE).toUpperCase();
        }
        if (StringUtils.isEmpty((CharSequence)batchStatementType)) {
            throw new IllegalArgumentException(String.format("Batch Statement Type is not specified, FlowFile %s", inputFlowFile));
        }
        Session connectionSession = (Session)this.cassandraSession.get();
        AtomicInteger recordsAdded = new AtomicInteger(0);
        StopWatch stopWatch = new StopWatch(true);
        boolean error = false;
        try (InputStream inputStream = session.read(inputFlowFile);
             RecordReader reader = recordParserFactory.createRecordReader(inputFlowFile, inputStream, this.getLogger());){
            Record record;
            if (StringUtils.isEmpty((CharSequence)statementType)) {
                throw new IllegalArgumentException(String.format("Statement Type is not specified, FlowFile %s", inputFlowFile));
            }
            if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType) && StringUtils.isEmpty((CharSequence)updateKeys)) {
                throw new IllegalArgumentException(String.format("Update Keys are not specified, FlowFile %s", inputFlowFile));
            }
            if ((INCR_TYPE.getValue().equalsIgnoreCase(updateMethod) || DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) && !UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType) && !COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType)) {
                throw new IllegalArgumentException(String.format("Increment/Decrement Update Method can only be used with COUNTER or UNLOGGED Batch Statement Type, FlowFile %s", inputFlowFile));
            }
            RecordSchema schema = reader.getSchema();
            BatchStatement batchStatement = new BatchStatement(BatchStatement.Type.valueOf((String)batchStatementType));
            batchStatement.setSerialConsistencyLevel(ConsistencyLevel.valueOf((String)serialConsistencyLevel));
            while ((record = reader.nextRecord()) != null) {
                Statement query;
                Map recordContentMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)record, (DataType)RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
                if (INSERT_TYPE.getValue().equalsIgnoreCase(statementType)) {
                    query = this.generateInsert(cassandraTable, schema, recordContentMap);
                } else if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
                    query = this.generateUpdate(cassandraTable, schema, updateKeys, updateMethod, recordContentMap);
                } else {
                    throw new IllegalArgumentException(String.format("Statement Type %s is not valid, FlowFile %s", statementType, inputFlowFile));
                }
                if (this.getLogger().isDebugEnabled()) {
                    this.getLogger().debug("Query: {}", new Object[]{query.toString()});
                }
                batchStatement.add(query);
                if (recordsAdded.incrementAndGet() != batchSize) continue;
                connectionSession.execute((Statement)batchStatement);
                batchStatement.clear();
                recordsAdded.set(0);
            }
            if (batchStatement.size() != 0) {
                connectionSession.execute((Statement)batchStatement);
                batchStatement.clear();
            }
        }
        catch (Exception e) {
            error = true;
            this.getLogger().error("Unable to write the records into Cassandra table due to {}", new Object[]{e});
            session.transfer(inputFlowFile, REL_FAILURE);
        }
        finally {
            if (!error) {
                stopWatch.stop();
                long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
                String transitUri = "cassandra://" + connectionSession.getCluster().getMetadata().getClusterName() + "." + cassandraTable;
                session.getProvenanceReporter().send(inputFlowFile, transitUri, "Inserted " + recordsAdded.get() + " records", duration);
                session.transfer(inputFlowFile, REL_SUCCESS);
            }
        }
    }

    protected Statement generateUpdate(String cassandraTable, RecordSchema schema, String updateKeys, String updateMethod, Map<String, Object> recordContentMap) {
        Update updateQuery;
        Set updateKeyNames = Arrays.stream(updateKeys.split(",")).map(String::trim).filter(StringUtils::isNotEmpty).collect(Collectors.toSet());
        if (updateKeyNames.isEmpty()) {
            throw new IllegalArgumentException("No Update Keys were specified");
        }
        for (String updateKey : updateKeyNames) {
            if (schema.getFieldNames().contains(updateKey)) continue;
            throw new IllegalArgumentException("Update key '" + updateKey + "' is not present in the record schema");
        }
        if (cassandraTable.contains(".")) {
            String[] keyspaceAndTable = cassandraTable.split("\\.");
            updateQuery = QueryBuilder.update((String)keyspaceAndTable[0], (String)keyspaceAndTable[1]);
        } else {
            updateQuery = QueryBuilder.update((String)cassandraTable);
        }
        for (String fieldName : schema.getFieldNames()) {
            Assignment assignment;
            Object fieldValue = recordContentMap.get(fieldName);
            if (updateKeyNames.contains(fieldName)) {
                updateQuery.where(QueryBuilder.eq((String)fieldName, (Object)fieldValue));
                continue;
            }
            if (SET_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
                assignment = QueryBuilder.set((String)fieldName, (Object)fieldValue);
            } else if (INCR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
                assignment = QueryBuilder.incr((String)fieldName, (long)this.convertFieldObjectToLong(fieldName, fieldValue));
            } else if (DECR_TYPE.getValue().equalsIgnoreCase(updateMethod)) {
                assignment = QueryBuilder.decr((String)fieldName, (long)this.convertFieldObjectToLong(fieldName, fieldValue));
            } else {
                throw new IllegalArgumentException("Update Method '" + updateMethod + "' is not valid.");
            }
            updateQuery.with(assignment);
        }
        return updateQuery;
    }

    private Long convertFieldObjectToLong(String name, Object value) {
        if (!(value instanceof Number)) {
            throw new IllegalArgumentException("Field '" + name + "' is not of type Number");
        }
        return ((Number)value).longValue();
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        Set results = (Set)super.customValidate(validationContext);
        String statementType = validationContext.getProperty(STATEMENT_TYPE).getValue();
        if (UPDATE_TYPE.getValue().equalsIgnoreCase(statementType)) {
            String updateKeys = validationContext.getProperty(UPDATE_KEYS).getValue();
            if (StringUtils.isEmpty((CharSequence)updateKeys)) {
                results.add(new ValidationResult.Builder().subject("Update statement configuration").valid(false).explanation("if the Statement Type is set to Update, then the Update Keys must be specified as well").build());
            }
            String updateMethod = validationContext.getProperty(UPDATE_METHOD).getValue();
            String batchStatementType = validationContext.getProperty(BATCH_STATEMENT_TYPE).getValue();
            if (!(!INCR_TYPE.getValue().equalsIgnoreCase(updateMethod) && !DECR_TYPE.getValue().equalsIgnoreCase(updateMethod) || COUNTER_TYPE.getValue().equalsIgnoreCase(batchStatementType) || UNLOGGED_TYPE.getValue().equalsIgnoreCase(batchStatementType) || BATCH_STATEMENT_TYPE_USE_ATTR_TYPE.getValue().equalsIgnoreCase(batchStatementType))) {
                results.add(new ValidationResult.Builder().subject("Update method configuration").valid(false).explanation("if the Update Method is set to Increment or Decrement, then the Batch Statement Type must be set to either COUNTER or UNLOGGED").build());
            }
        }
        return results;
    }

    protected Statement generateInsert(String cassandraTable, RecordSchema schema, Map<String, Object> recordContentMap) {
        Insert insertQuery;
        if (cassandraTable.contains(".")) {
            String[] keyspaceAndTable = cassandraTable.split("\\.");
            insertQuery = QueryBuilder.insertInto((String)keyspaceAndTable[0], (String)keyspaceAndTable[1]);
        } else {
            insertQuery = QueryBuilder.insertInto((String)cassandraTable);
        }
        for (String fieldName : schema.getFieldNames()) {
            DataType fieldDataType;
            Object[] array;
            Object value = recordContentMap.get(fieldName);
            if (value != null && value.getClass().isArray() && (array = (Object[])value).length > 0 && array[0] instanceof Byte) {
                Object[] temp = value;
                byte[] newArray = new byte[temp.length];
                for (int x = 0; x < temp.length; ++x) {
                    newArray[x] = (Byte)temp[x];
                }
                value = ByteBuffer.wrap(newArray);
            }
            if (schema.getDataType(fieldName).isPresent() && (fieldDataType = (DataType)schema.getDataType(fieldName).get()).getFieldType() == RecordFieldType.ARRAY && ((ArrayDataType)fieldDataType).getElementType().getFieldType() == RecordFieldType.STRING) {
                value = Arrays.stream((Object[])value).toArray(String[]::new);
            }
            insertQuery.value(fieldName, value);
        }
        return insertQuery;
    }

    @Override
    @OnUnscheduled
    public void stop(ProcessContext context) {
        super.stop(context);
    }

    @OnShutdown
    public void shutdown(ProcessContext context) {
        super.stop(context);
    }
}

