/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.hbase;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.AbstractPutHBase;
import org.apache.nifi.hbase.put.PutColumn;
import org.apache.nifi.hbase.put.PutFlowFile;
import org.apache.nifi.hbase.util.VisibilityUtil;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@EventDriven
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"hadoop", "hbase", "put", "json"})
@CapabilityDescription(value="Adds rows to HBase based on the contents of incoming JSON documents. Each FlowFile must contain a single UTF-8 encoded JSON document, and any FlowFiles where the root element is not a single document will be routed to failure. Each JSON field name and value will become a column qualifier and value of the HBase row. Any fields with a null value will be skipped, and fields with a complex value will be handled according to the Complex Field Strategy. The row id can be specified either directly on the processor through the Row Identifier property, or can be extracted from the JSON document by specifying the Row Identifier Field Name property. This processor will hold the contents of all FlowFiles for the given batch in memory at one time.")
public class PutHBaseJSON
extends AbstractPutHBase {
    protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder().name("Row Identifier Field Name").description("Specifies the name of a JSON element whose value should be used as the row id for the given JSON document.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    protected static final String FAIL_VALUE = "Fail";
    protected static final String WARN_VALUE = "Warn";
    protected static final String IGNORE_VALUE = "Ignore";
    protected static final String TEXT_VALUE = "Text";
    protected static final AllowableValue COMPLEX_FIELD_FAIL = new AllowableValue("Fail", "Fail", "Route entire FlowFile to failure if any elements contain complex values.");
    protected static final AllowableValue COMPLEX_FIELD_WARN = new AllowableValue("Warn", "Warn", "Provide a warning and do not include field in row sent to HBase.");
    protected static final AllowableValue COMPLEX_FIELD_IGNORE = new AllowableValue("Ignore", "Ignore", "Silently ignore and do not include in row sent to HBase.");
    protected static final AllowableValue COMPLEX_FIELD_TEXT = new AllowableValue("Text", "Text", "Use the string representation of the complex field as the value of the given column.");
    protected static final PropertyDescriptor COMPLEX_FIELD_STRATEGY = new PropertyDescriptor.Builder().name("Complex Field Strategy").description("Indicates how to handle complex fields, i.e. fields that do not have a single text value.").expressionLanguageSupported(ExpressionLanguageScope.NONE).required(true).allowableValues(new AllowableValue[]{COMPLEX_FIELD_FAIL, COMPLEX_FIELD_WARN, COMPLEX_FIELD_IGNORE, COMPLEX_FIELD_TEXT}).defaultValue(COMPLEX_FIELD_TEXT.getValue()).build();
    protected static final AllowableValue FIELD_ENCODING_STRING = new AllowableValue("String", "String", "Stores the value of each field as a UTF-8 String.");
    protected static final AllowableValue FIELD_ENCODING_BYTES = new AllowableValue("Bytes", "Bytes", "Stores the value of each field as the byte representation of the type derived from the JSON.");
    protected static final PropertyDescriptor FIELD_ENCODING_STRATEGY = new PropertyDescriptor.Builder().name("Field Encoding Strategy").description("Indicates how to store the value of each field in HBase. The default behavior is to convert each value from the JSON to a String, and store the UTF-8 bytes. Choosing Bytes will interpret the type of each field from the JSON, and convert the value to the byte representation of that type, meaning an integer will be stored as the byte representation of that integer.").required(true).allowableValues(new AllowableValue[]{FIELD_ENCODING_STRING, FIELD_ENCODING_BYTES}).defaultValue(FIELD_ENCODING_STRING.getValue()).build();

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(HBASE_CLIENT_SERVICE);
        properties.add(TABLE_NAME);
        properties.add(ROW_ID);
        properties.add(ROW_FIELD_NAME);
        properties.add(ROW_ID_ENCODING_STRATEGY);
        properties.add(COLUMN_FAMILY);
        properties.add(TIMESTAMP);
        properties.add(BATCH_SIZE);
        properties.add(COMPLEX_FIELD_STRATEGY);
        properties.add(FIELD_ENCODING_STRATEGY);
        return properties;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_FAILURE);
        return rels;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String rowId = validationContext.getProperty(ROW_ID).getValue();
        String rowFieldName = validationContext.getProperty(ROW_FIELD_NAME).getValue();
        if (StringUtils.isBlank((CharSequence)rowId) && StringUtils.isBlank((CharSequence)rowFieldName)) {
            results.add(new ValidationResult.Builder().subject(((Object)((Object)this)).getClass().getSimpleName()).explanation("Row Identifier or Row Identifier Field Name is required").valid(false).build());
        } else if (!StringUtils.isBlank((CharSequence)rowId) && !StringUtils.isBlank((CharSequence)rowFieldName)) {
            results.add(new ValidationResult.Builder().subject(((Object)((Object)this)).getClass().getSimpleName()).explanation("Row Identifier and Row Identifier Field Name can not be used together").valid(false).build());
        }
        return results;
    }

    @Override
    protected PutFlowFile createPut(ProcessSession session, ProcessContext context, FlowFile flowFile) {
        Long timestamp;
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
        String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
        String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
        boolean extractRowId = !StringUtils.isBlank((CharSequence)rowFieldName);
        String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
        String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
        String rowIdEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
        if (!StringUtils.isBlank((CharSequence)timestampValue)) {
            try {
                timestamp = Long.valueOf(timestampValue);
            }
            catch (Exception e) {
                this.getLogger().error("Invalid timestamp value: " + timestampValue, (Throwable)e);
                return null;
            }
        } else {
            timestamp = null;
        }
        ObjectMapper mapper = new ObjectMapper();
        AtomicReference<Object> rootNodeRef = new AtomicReference<Object>(null);
        try {
            session.read(flowFile, in -> {
                try (BufferedInputStream bufferedIn = new BufferedInputStream(in);){
                    rootNodeRef.set(mapper.readTree((InputStream)bufferedIn));
                }
            });
        }
        catch (ProcessException pe) {
            this.getLogger().error("Failed to parse {} as JSON due to {}; routing to failure", new Object[]{flowFile, pe.toString(), pe});
            return null;
        }
        JsonNode rootNode = rootNodeRef.get();
        if (rootNode.isArray()) {
            this.getLogger().error("Root node of JSON must be a single document, found array for {}; routing to failure", new Object[]{flowFile});
            return null;
        }
        ArrayList<PutColumn> columns = new ArrayList<PutColumn>();
        AtomicReference<Object> rowIdHolder = new AtomicReference<Object>(null);
        Iterator fieldNames = rootNode.fieldNames();
        while (fieldNames.hasNext()) {
            String fieldName = (String)fieldNames.next();
            AtomicReference<Object> fieldValueHolder = new AtomicReference<Object>(null);
            JsonNode fieldNode = rootNode.get(fieldName);
            if (fieldNode.isNull()) {
                this.getLogger().debug("Skipping {} because value was null", new Object[]{fieldName});
            } else if (fieldNode.isValueNode()) {
                if ("String".equals(fieldEncodingStrategy)) {
                    Object valueBytes = this.clientService.toBytes(fieldNode.asText());
                    fieldValueHolder.set(valueBytes);
                } else {
                    fieldValueHolder.set(this.extractJNodeValue(fieldNode));
                }
            } else {
                switch (complexFieldStrategy) {
                    case "Fail": {
                        this.getLogger().error("Complex value found for {}; routing to failure", new Object[]{fieldName});
                        return null;
                    }
                    case "Warn": {
                        this.getLogger().warn("Complex value found for {}; skipping", new Object[]{fieldName});
                        break;
                    }
                    case "Text": {
                        fieldValueHolder.set(this.clientService.toBytes(fieldNode.toString()));
                        break;
                    }
                    case "Ignore": {
                        break;
                    }
                }
            }
            if (fieldValueHolder.get() == null) continue;
            if (extractRowId && fieldName.equals(rowFieldName)) {
                rowIdHolder.set(fieldNode.asText());
                continue;
            }
            byte[] colFamBytes = columnFamily.getBytes(StandardCharsets.UTF_8);
            byte[] colQualBytes = fieldName.getBytes(StandardCharsets.UTF_8);
            byte[] colValBytes = fieldValueHolder.get();
            String visibilityStringToUse = VisibilityUtil.pickVisibilityString(columnFamily, fieldName, flowFile, context);
            PutColumn column = StringUtils.isEmpty((CharSequence)visibilityStringToUse) ? new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp) : new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp, visibilityStringToUse);
            columns.add(column);
        }
        if (extractRowId && rowIdHolder.get() == null) {
            String fieldNameStr = StringUtils.join((Iterator)rootNode.fieldNames(), (String)",");
            this.getLogger().error("Row ID field named '{}' not found in field names '{}'; routing to failure", new Object[]{rowFieldName, fieldNameStr});
            return null;
        }
        String putRowId = extractRowId ? (String)rowIdHolder.get() : rowId;
        byte[] rowKeyBytes = this.getRow(putRowId, rowIdEncodingStrategy);
        return new PutFlowFile(tableName, rowKeyBytes, columns, flowFile);
    }

    private byte[] extractJNodeValue(JsonNode n) {
        if (n.isBoolean()) {
            return this.clientService.toBytes(n.asBoolean());
        }
        if (n.isNumber()) {
            if (n.isIntegralNumber()) {
                return this.clientService.toBytes(n.asLong());
            }
            return this.clientService.toBytes(n.asDouble());
        }
        return this.clientService.toBytes(n.asText());
    }
}

