/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.cassandra;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.InvalidTypeException;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.cassandra.AbstractCassandraProcessor;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StringUtils;

@SupportsBatching
@Tags(value={"cassandra", "cql", "put", "insert", "update", "set"})
@EventDriven
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription(value="Execute provided Cassandra Query Language (CQL) statement on a Cassandra 1.x, 2.x, or 3.0.x cluster. The content of an incoming FlowFile is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be a lowercase string indicating the Cassandra type.")
@ReadsAttributes(value={@ReadsAttribute(attribute="cql.args.N.type", description="Incoming FlowFiles are expected to be parameterized CQL statements. The type of each parameter is specified as a lowercase string corresponding to the Cassandra data type (text, int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the collection should be comma-delimited, follow the collection type, and be enclosed in angle brackets (< and >), for example set<text> or map<timestamp, int>."), @ReadsAttribute(attribute="cql.args.N.value", description="Incoming FlowFiles are expected to be parameterized CQL statements. The value of the parameters are specified as cql.args.1.value, cql.args.2.value, cql.args.3.value, and so on. The  type of the cql.args.1.value parameter is specified by the cql.args.1.type attribute.")})
@SystemResourceConsideration(resource=SystemResource.MEMORY)
@DeprecationNotice(reason="DataStax 3 driver for Cassandra is no longer the current version and requires new components.")
public class PutCassandraQL
extends AbstractCassandraProcessor {
    public static final PropertyDescriptor STATEMENT_TIMEOUT = new PropertyDescriptor.Builder().name("Max Wait Time").displayName("Max Wait Time").description("The maximum amount of time allowed for a running CQL select query. Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ").defaultValue("0 seconds").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor STATEMENT_CACHE_SIZE = new PropertyDescriptor.Builder().name("putcql-stmt-cache-size").displayName("Statement Cache Size").description("The maximum number of CQL Prepared Statements to cache. This can improve performance if many incoming flow files have the same CQL statement with different values for the parameters. If this property is set to zero, the cache is effectively disabled.").defaultValue("0").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;
    private static final Pattern CQL_TYPE_ATTRIBUTE_PATTERN;
    private static final Pattern CQL_TYPE_PATTERN;
    @VisibleForTesting
    private ConcurrentMap<String, PreparedStatement> statementCache;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        int statementCacheSize = context.getProperty(STATEMENT_CACHE_SIZE).evaluateAttributeExpressions().asInteger();
        this.statementCache = CacheBuilder.newBuilder().maximumSize((long)statementCacheSize).build().asMap();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        ComponentLog logger = this.getLogger();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        long startNanos = System.nanoTime();
        long statementTimeout = context.getProperty(STATEMENT_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.MILLISECONDS);
        Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
        Session connectionSession = (Session)this.cassandraSession.get();
        String cql = this.getCQL(session, flowFile, charset);
        try {
            PreparedStatement statement = (PreparedStatement)this.statementCache.get(cql);
            if (statement == null) {
                statement = connectionSession.prepare(cql);
                this.statementCache.put(cql, statement);
            }
            BoundStatement boundStatement = statement.bind();
            Map attributes = flowFile.getAttributes();
            for (Map.Entry entry : attributes.entrySet()) {
                String key = (String)entry.getKey();
                Matcher matcher = CQL_TYPE_ATTRIBUTE_PATTERN.matcher(key);
                if (!matcher.matches()) continue;
                int parameterIndex = Integer.parseInt(matcher.group(1));
                String paramType = (String)entry.getValue();
                if (StringUtils.isEmpty((String)paramType)) {
                    throw new ProcessException("Value of the " + key + " attribute is null or empty, it must contain a valid value");
                }
                paramType = paramType.trim();
                String valueAttrName = "cql.args." + parameterIndex + ".value";
                String parameterValue = (String)attributes.get(valueAttrName);
                try {
                    this.setStatementObject(boundStatement, parameterIndex - 1, valueAttrName, parameterValue, paramType);
                }
                catch (InvalidTypeException | IllegalArgumentException e) {
                    throw new ProcessException("The value of the " + valueAttrName + " is '" + parameterValue + "', which cannot be converted into the necessary data type: " + paramType, e);
                }
            }
            try {
                ResultSetFuture future = connectionSession.executeAsync((Statement)boundStatement);
                if (statementTimeout > 0L) {
                    future.getUninterruptibly(statementTimeout, TimeUnit.MILLISECONDS);
                } else {
                    future.getUninterruptibly();
                }
                long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                String transitUri = "cassandra://" + connectionSession.getCluster().getMetadata().getClusterName();
                session.getProvenanceReporter().send(flowFile, transitUri, transmissionMillis, true);
                session.transfer(flowFile, REL_SUCCESS);
            }
            catch (TimeoutException e) {
                throw new ProcessException((Throwable)e);
            }
        }
        catch (NoHostAvailableException nhae) {
            this.getLogger().error("No host in the Cassandra cluster can be contacted successfully to execute this statement", (Throwable)nhae);
            this.getLogger().error(nhae.getCustomMessage(10, true, false));
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_RETRY);
        }
        catch (QueryExecutionException qee) {
            logger.error("Cannot execute the statement with the requested consistency level successfully", (Throwable)qee);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_RETRY);
        }
        catch (QueryValidationException qve) {
            logger.error("The CQL statement {} is invalid due to syntax error, authorization issue, or another validation problem; routing {} to failure", new Object[]{cql, flowFile, qve});
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
        }
        catch (ProcessException e) {
            logger.error("Unable to execute CQL select statement {} for {}", new Object[]{cql, flowFile, e});
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private String getCQL(ProcessSession session, FlowFile flowFile, Charset charset) {
        final byte[] buffer = new byte[(int)flowFile.getSize()];
        session.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.fillBuffer((InputStream)in, (byte[])buffer);
            }
        });
        return new String(buffer, charset);
    }

    protected void setStatementObject(BoundStatement statement, int paramIndex, String attrName, String paramValue, String paramType) throws IllegalArgumentException {
        if (paramValue == null) {
            statement.setToNull(paramIndex);
            return;
        }
        if (paramType == null) {
            throw new IllegalArgumentException("Parameter type for " + attrName + " cannot be null");
        }
        Matcher matcher = CQL_TYPE_PATTERN.matcher(paramType);
        if (matcher.find() && matcher.groupCount() > 1) {
            String mainTypeString = matcher.group(1).toLowerCase();
            DataType mainType = PutCassandraQL.getPrimitiveDataTypeFromString(mainTypeString);
            if (mainType != null) {
                TypeCodec typeCodec = codecRegistry.codecFor(mainType);
                if (mainType.equals(DataType.ascii()) || mainType.equals(DataType.text()) || mainType.equals(DataType.varchar()) || mainType.equals(DataType.inet()) || mainType.equals(DataType.varint())) {
                    statement.setString(paramIndex, paramValue);
                } else if (mainType.equals(DataType.cboolean())) {
                    statement.setBool(paramIndex, ((Boolean)typeCodec.parse(paramValue)).booleanValue());
                } else if (mainType.equals(DataType.cint())) {
                    statement.setInt(paramIndex, ((Integer)typeCodec.parse(paramValue)).intValue());
                } else if (mainType.equals(DataType.bigint()) || mainType.equals(DataType.counter())) {
                    statement.setLong(paramIndex, ((Long)typeCodec.parse(paramValue)).longValue());
                } else if (mainType.equals(DataType.cfloat())) {
                    statement.setFloat(paramIndex, ((Float)typeCodec.parse(paramValue)).floatValue());
                } else if (mainType.equals(DataType.cdouble())) {
                    statement.setDouble(paramIndex, ((Double)typeCodec.parse(paramValue)).doubleValue());
                } else if (mainType.equals(DataType.blob())) {
                    statement.setBytes(paramIndex, (ByteBuffer)typeCodec.parse(paramValue));
                } else if (mainType.equals(DataType.timestamp())) {
                    statement.setTimestamp(paramIndex, (Date)typeCodec.parse(paramValue));
                } else if (mainType.equals(DataType.timeuuid()) || mainType.equals(DataType.uuid())) {
                    statement.setUUID(paramIndex, (UUID)typeCodec.parse(paramValue));
                }
                return;
            }
            if (matcher.groupCount() > 2) {
                String firstParamTypeName = matcher.group(3);
                DataType firstParamType = PutCassandraQL.getPrimitiveDataTypeFromString(firstParamTypeName);
                if (firstParamType == null) {
                    throw new IllegalArgumentException("Nested collections are not supported");
                }
                if (DataType.Name.MAP.toString().equalsIgnoreCase(mainTypeString)) {
                    if (matcher.groupCount() > 4) {
                        String secondParamTypeName = matcher.group(5);
                        DataType secondParamType = PutCassandraQL.getPrimitiveDataTypeFromString(secondParamTypeName);
                        DataType.CollectionType mapType = DataType.map((DataType)firstParamType, (DataType)secondParamType);
                        statement.setMap(paramIndex, (Map)codecRegistry.codecFor((DataType)mapType).parse(paramValue));
                        return;
                    }
                } else {
                    if (DataType.Name.SET.toString().equalsIgnoreCase(mainTypeString)) {
                        DataType.CollectionType setType = DataType.set((DataType)firstParamType);
                        statement.setSet(paramIndex, (Set)codecRegistry.codecFor((DataType)setType).parse(paramValue));
                        return;
                    }
                    if (DataType.Name.LIST.toString().equalsIgnoreCase(mainTypeString)) {
                        DataType.CollectionType listType = DataType.list((DataType)firstParamType);
                        statement.setList(paramIndex, (List)codecRegistry.codecFor((DataType)listType).parse(paramValue));
                        return;
                    }
                }
            } else {
                throw new IllegalArgumentException("Collection type " + mainTypeString + " needs parameterized type(s), such as set<text>");
            }
        }
        throw new IllegalArgumentException("Cannot create object of type " + paramType + " using input " + paramValue);
    }

    @Override
    @OnStopped
    public void stop(ProcessContext context) {
        super.stop(context);
        this.statementCache.clear();
    }

    static {
        CQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("cql\\.args\\.(\\d+)\\.type");
        CQL_TYPE_PATTERN = Pattern.compile("([^<]+)(<([^,>]+)(,([^,>]+))*>)?");
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.addAll(descriptors);
        _propertyDescriptors.add(STATEMENT_TIMEOUT);
        _propertyDescriptors.add(STATEMENT_CACHE_SIZE);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        _relationships.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

