/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.datastax.driver.core.exceptions.QueryValidationException;
import com.google.common.annotations.VisibleForTesting;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.cassandra.AbstractCassandraProcessor;
import org.apache.nifi.util.StopWatch;

@Tags(value={"cassandra", "cql", "select"})
@EventDriven
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription(value="Execute provided Cassandra Query Language (CQL) select query on a Cassandra 1.x, 2.x, or 3.0.x cluster. Query result may be converted to Avro or JSON format. Streaming is used so arbitrarily large result sets are supported. This processor can be scheduled to run on a timer, or cron expression, using the standard scheduling methods, or it can be triggered by an incoming FlowFile. If it is triggered by an incoming FlowFile, then attributes of that FlowFile will be available when evaluating the select query. FlowFile attribute 'executecql.row.count' indicates how many rows were selected.")
@WritesAttributes(value={@WritesAttribute(attribute="executecql.row.count", description="The number of rows returned by the CQL query")})
public class QueryCassandra
extends AbstractCassandraProcessor {
    public static final String AVRO_FORMAT = "Avro";
    public static final String JSON_FORMAT = "JSON";
    public static final String RESULT_ROW_COUNT = "executecql.row.count";
    public static final PropertyDescriptor CQL_SELECT_QUERY = new PropertyDescriptor.Builder().name("CQL select query").description("CQL select query").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor QUERY_TIMEOUT = new PropertyDescriptor.Builder().name("Max Wait Time").description("The maximum amount of time allowed for a running CQL select query. Must be of format <duration> <TimeUnit> where <duration> is a non-negative integer and TimeUnit is a supported Time Unit, such as: nanos, millis, secs, mins, hrs, days. A value of zero means there is no limit. ").defaultValue("0 seconds").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor FETCH_SIZE = new PropertyDescriptor.Builder().name("Fetch size").description("The number of result rows to be fetched from the result set at a time. Zero is the default and means there is no limit.").defaultValue("0").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor OUTPUT_FORMAT = new PropertyDescriptor.Builder().name("Output Format").description("The format to which the result rows will be converted. If JSON is selected, the output will contain an object with field 'results' containing an array of result rows. Each row in the array is a map of the named column to its value. For example: { \"results\": [{\"userid\":1, \"name\":\"Joe Smith\"}]}").required(true).allowableValues(new String[]{"Avro", "JSON"}).defaultValue("Avro").build();
    public static final PropertyDescriptor TIMESTAMP_FORMAT_PATTERN = new PropertyDescriptor.Builder().name("timestamp-format-pattern").displayName("Timestamp Format Pattern for JSON output").description("Pattern to use when converting timestamp fields to JSON. Note: the formatted timestamp will be in UTC timezone.").required(true).defaultValue("yyyy-MM-dd HH:mm:ssZ").addValidator((subject, input, context) -> {
        ValidationResult.Builder vrb = new ValidationResult.Builder().subject(subject).input(input);
        try {
            new SimpleDateFormat(input).format(new Date());
            vrb.valid(true).explanation("Valid date format pattern");
        }
        catch (Exception ex) {
            vrb.valid(false).explanation("the pattern is invalid: " + ex.getMessage());
        }
        return vrb.build();
    }).build();
    private static final List<PropertyDescriptor> propertyDescriptors;
    private static final Set<Relationship> relationships;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        int fetchSize = context.getProperty(FETCH_SIZE).evaluateAttributeExpressions().asInteger();
        if (fetchSize > 0) {
            Cluster cluster = (Cluster)this.cluster.get();
            synchronized (cluster) {
                ((Cluster)this.cluster.get()).getConfiguration().getQueryOptions().setFetchSize(fetchSize);
            }
        }
    }

    public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile fileToProcess = null;
        if (context.hasIncomingConnection() && (fileToProcess = session.get()) == null && context.hasNonLoopConnection()) {
            return;
        }
        final ComponentLog logger = this.getLogger();
        final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
        final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS);
        final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue();
        final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue());
        StopWatch stopWatch = new StopWatch(true);
        if (fileToProcess == null) {
            fileToProcess = session.create();
        }
        try {
            Session connectionSession = (Session)this.cassandraSession.get();
            final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery);
            final AtomicLong nrOfRows = new AtomicLong(0L);
            fileToProcess = session.write(fileToProcess, new OutputStreamCallback(){

                public void process(OutputStream rawOut) throws IOException {
                    try (BufferedOutputStream out = new BufferedOutputStream(rawOut);){
                        logger.debug("Executing CQL query {}", new Object[]{selectQuery});
                        if (queryTimeout > 0L) {
                            ResultSet resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS);
                            if (QueryCassandra.AVRO_FORMAT.equals(outputFormat)) {
                                nrOfRows.set(QueryCassandra.convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS));
                            } else if (QueryCassandra.JSON_FORMAT.equals(outputFormat)) {
                                nrOfRows.set(QueryCassandra.convertToJsonStream(Optional.of(context), resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS));
                            }
                        } else {
                            ResultSet resultSet = queryFuture.getUninterruptibly();
                            if (QueryCassandra.AVRO_FORMAT.equals(outputFormat)) {
                                nrOfRows.set(QueryCassandra.convertToAvroStream(resultSet, out, 0L, null));
                            } else if (QueryCassandra.JSON_FORMAT.equals(outputFormat)) {
                                nrOfRows.set(QueryCassandra.convertToJsonStream(Optional.of(context), resultSet, out, charset, 0L, null));
                            }
                        }
                    }
                    catch (InterruptedException | ExecutionException | TimeoutException e) {
                        throw new ProcessException((Throwable)e);
                    }
                }
            });
            fileToProcess = session.putAttribute(fileToProcess, RESULT_ROW_COUNT, String.valueOf(nrOfRows.get()));
            fileToProcess = session.putAttribute(fileToProcess, CoreAttributes.MIME_TYPE.key(), JSON_FORMAT.equals(outputFormat) ? "application/json" : "application/avro-binary");
            logger.info("{} contains {} Avro records; transferring to 'success'", new Object[]{fileToProcess, nrOfRows.get()});
            session.getProvenanceReporter().modifyContent(fileToProcess, "Retrieved " + nrOfRows.get() + " rows", stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(fileToProcess, REL_SUCCESS);
        }
        catch (NoHostAvailableException nhae) {
            this.getLogger().error("No host in the Cassandra cluster can be contacted successfully to execute this query", (Throwable)nhae);
            this.getLogger().error(nhae.getCustomMessage(10, true, false));
            fileToProcess = session.penalize(fileToProcess);
            session.transfer(fileToProcess, REL_RETRY);
        }
        catch (QueryExecutionException qee) {
            logger.error("Cannot execute the query with the requested consistency level successfully", (Throwable)qee);
            fileToProcess = session.penalize(fileToProcess);
            session.transfer(fileToProcess, REL_RETRY);
        }
        catch (QueryValidationException qve) {
            if (context.hasIncomingConnection()) {
                logger.error("The CQL query {} is invalid due to syntax error, authorization issue, or another validation problem; routing {} to failure", new Object[]{selectQuery, fileToProcess}, (Throwable)qve);
                fileToProcess = session.penalize(fileToProcess);
                session.transfer(fileToProcess, REL_FAILURE);
            } else {
                logger.error("The CQL query {} is invalid due to syntax error, authorization issue, or another validation problem", new Object[]{selectQuery}, (Throwable)qve);
                session.remove(fileToProcess);
                context.yield();
            }
        }
        catch (ProcessException e) {
            if (context.hasIncomingConnection()) {
                logger.error("Unable to execute CQL select query {} for {} due to {}; routing to failure", new Object[]{selectQuery, fileToProcess, e});
                fileToProcess = session.penalize(fileToProcess);
                session.transfer(fileToProcess, REL_FAILURE);
            }
            logger.error("Unable to execute CQL select query {} due to {}", new Object[]{selectQuery, e});
            session.remove(fileToProcess);
            context.yield();
        }
    }

    @Override
    @OnUnscheduled
    public void stop(ProcessContext context) {
        super.stop(context);
    }

    @OnShutdown
    public void shutdown(ProcessContext context) {
        super.stop(context);
    }

    public static long convertToAvroStream(ResultSet rs, OutputStream outStream, long timeout, TimeUnit timeUnit) throws IOException, InterruptedException, TimeoutException, ExecutionException {
        Schema schema = QueryCassandra.createSchema(rs);
        GenericData.Record rec = new GenericData.Record(schema);
        GenericDatumWriter datumWriter = new GenericDatumWriter(schema);
        try (DataFileWriter dataFileWriter = new DataFileWriter((DatumWriter)datumWriter);){
            dataFileWriter.create(schema, outStream);
            ColumnDefinitions columnDefinitions = rs.getColumnDefinitions();
            long nrOfRows = 0L;
            if (columnDefinitions != null) {
                do {
                    int rowsAvailableWithoutFetching;
                    if ((rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching()) == 0) {
                        if (timeout <= 0L || timeUnit == null) {
                            rs.fetchMoreResults().get();
                        } else {
                            rs.fetchMoreResults().get(timeout, timeUnit);
                        }
                    }
                    for (Row row : rs) {
                        for (int i = 0; i < columnDefinitions.size(); ++i) {
                            DataType dataType = columnDefinitions.getType(i);
                            if (row.isNull(i)) {
                                rec.put(i, null);
                                continue;
                            }
                            rec.put(i, QueryCassandra.getCassandraObject(row, i, dataType));
                        }
                        dataFileWriter.append((Object)rec);
                        ++nrOfRows;
                    }
                } while (!rs.isFullyFetched());
            }
            long l = nrOfRows;
            return l;
        }
    }

    public static long convertToJsonStream(ResultSet rs, OutputStream outStream, Charset charset, long timeout, TimeUnit timeUnit) throws IOException, InterruptedException, TimeoutException, ExecutionException {
        return QueryCassandra.convertToJsonStream(Optional.empty(), rs, outStream, charset, timeout, timeUnit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    static long convertToJsonStream(Optional<ProcessContext> context, ResultSet rs, OutputStream outStream, Charset charset, long timeout, TimeUnit timeUnit) throws IOException, InterruptedException, TimeoutException, ExecutionException {
        try {
            outStream.write("{\"results\":[".getBytes(charset));
            ColumnDefinitions columnDefinitions = rs.getColumnDefinitions();
            long nrOfRows = 0L;
            if (columnDefinitions != null) {
                do {
                    int rowsAvailableWithoutFetching;
                    if ((rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching()) == 0) {
                        if (timeout <= 0L || timeUnit == null) {
                            rs.fetchMoreResults().get();
                        } else {
                            rs.fetchMoreResults().get(timeout, timeUnit);
                        }
                    }
                    for (Row row : rs) {
                        if (nrOfRows != 0L) {
                            outStream.write(",".getBytes(charset));
                        }
                        outStream.write("{".getBytes(charset));
                        for (int i = 0; i < columnDefinitions.size(); ++i) {
                            String valueString;
                            StringBuilder sb;
                            boolean first;
                            DataType dataType = columnDefinitions.getType(i);
                            String colName = columnDefinitions.getName(i);
                            if (i != 0) {
                                outStream.write(",".getBytes(charset));
                            }
                            if (row.isNull(i)) {
                                outStream.write(("\"" + colName + "\":null").getBytes(charset));
                                continue;
                            }
                            Object value = QueryCassandra.getCassandraObject(row, i, dataType);
                            if (value instanceof List || value instanceof Set) {
                                first = true;
                                sb = new StringBuilder("[");
                                for (Object e : (Collection)value) {
                                    if (!first) {
                                        sb.append(",");
                                    }
                                    sb.append(QueryCassandra.getJsonElement(context, e));
                                    first = false;
                                }
                                sb.append("]");
                                valueString = sb.toString();
                            } else if (value instanceof Map) {
                                first = true;
                                sb = new StringBuilder("{");
                                for (Object object : ((Map)value).entrySet()) {
                                    Map.Entry entry = (Map.Entry)object;
                                    Object mapKey = entry.getKey();
                                    Object mapValue = entry.getValue();
                                    if (!first) {
                                        sb.append(",");
                                    }
                                    sb.append(QueryCassandra.getJsonElement(context, mapKey));
                                    sb.append(":");
                                    sb.append(QueryCassandra.getJsonElement(context, mapValue));
                                    first = false;
                                }
                                sb.append("}");
                                valueString = sb.toString();
                            } else {
                                valueString = QueryCassandra.getJsonElement(context, value);
                            }
                            outStream.write(("\"" + colName + "\":" + valueString).getBytes(charset));
                        }
                        ++nrOfRows;
                        outStream.write("}".getBytes(charset));
                    }
                } while (!rs.isFullyFetched());
            }
            long l = nrOfRows;
            return l;
        }
        finally {
            outStream.write("]}".getBytes());
        }
    }

    protected static String getJsonElement(Object value) {
        return QueryCassandra.getJsonElement(Optional.empty(), value);
    }

    protected static String getJsonElement(Optional<ProcessContext> context, Object value) {
        if (value instanceof Number) {
            return value.toString();
        }
        if (value instanceof Date) {
            return "\"" + QueryCassandra.getFormattedDate(context, (Date)value) + "\"";
        }
        if (value instanceof String) {
            return "\"" + StringEscapeUtils.escapeJson((String)((String)value)) + "\"";
        }
        return "\"" + value.toString() + "\"";
    }

    private static String getFormattedDate(Optional<ProcessContext> context, Date value) {
        String dateFormatPattern = context.map(_context -> _context.getProperty(TIMESTAMP_FORMAT_PATTERN).getValue()).orElse(TIMESTAMP_FORMAT_PATTERN.getDefaultValue());
        SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
        dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
        return dateFormat.format(value);
    }

    public static Schema createSchema(ResultSet rs) throws IOException {
        String tableNameFromMeta;
        ColumnDefinitions columnDefinitions = rs.getColumnDefinitions();
        int nrOfColumns = columnDefinitions == null ? 0 : columnDefinitions.size();
        String tableName = "NiFi_Cassandra_Query_Record";
        if (nrOfColumns > 0 && !StringUtils.isBlank((CharSequence)(tableNameFromMeta = columnDefinitions.getTable(0)))) {
            tableName = tableNameFromMeta;
        }
        SchemaBuilder.FieldAssembler builder = ((SchemaBuilder.RecordBuilder)SchemaBuilder.record((String)tableName).namespace("any.data")).fields();
        if (columnDefinitions != null) {
            for (int i = 0; i < nrOfColumns; ++i) {
                DataType dataType = columnDefinitions.getType(i);
                if (dataType == null) {
                    throw new IllegalArgumentException("No data type for column[" + i + "] with name " + columnDefinitions.getName(i));
                }
                if (dataType.isCollection()) {
                    List typeArguments = dataType.getTypeArguments();
                    if (typeArguments == null || typeArguments.size() == 0) {
                        throw new IllegalArgumentException("Column[" + i + "] " + dataType.getName() + " is a collection but no type arguments were specified!");
                    }
                    DataType firstArg = (DataType)typeArguments.get(0);
                    if (dataType.equals(DataType.set((DataType)firstArg)) || dataType.equals(DataType.list((DataType)firstArg))) {
                        ((SchemaBuilder.NullDefault)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.UnionAccumulator)builder.name(columnDefinitions.getName(i)).type().unionOf().nullBuilder().endNull()).and().array().items(QueryCassandra.getUnionFieldType(QueryCassandra.getPrimitiveAvroTypeFromCassandraType(firstArg)))).endUnion()).noDefault();
                        continue;
                    }
                    DataType secondArg = (DataType)typeArguments.get(1);
                    if (!dataType.equals(DataType.map((DataType)firstArg, (DataType)secondArg))) continue;
                    ((SchemaBuilder.NullDefault)((SchemaBuilder.UnionAccumulator)((SchemaBuilder.UnionAccumulator)builder.name(columnDefinitions.getName(i)).type().unionOf().nullBuilder().endNull()).and().map().values(QueryCassandra.getUnionFieldType(QueryCassandra.getPrimitiveAvroTypeFromCassandraType(secondArg)))).endUnion()).noDefault();
                    continue;
                }
                builder.name(columnDefinitions.getName(i)).type(QueryCassandra.getUnionFieldType(QueryCassandra.getPrimitiveAvroTypeFromCassandraType(dataType))).noDefault();
            }
        }
        return (Schema)builder.endRecord();
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.addAll(descriptors);
        _propertyDescriptors.add(CQL_SELECT_QUERY);
        _propertyDescriptors.add(QUERY_TIMEOUT);
        _propertyDescriptors.add(FETCH_SIZE);
        _propertyDescriptors.add(OUTPUT_FORMAT);
        _propertyDescriptors.add(TIMESTAMP_FORMAT_PATTERN);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        _relationships.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(_relationships);
    }
}

