/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor;
import org.apache.nifi.processors.elasticsearch.IdentifierNotFoundException;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleDateFormatValidator;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;

@Deprecated
@DeprecationNotice(classNames={"org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord"}, reason="This processor is deprecated and may be removed in future releases.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@Tags(value={"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http", "record"})
@CapabilityDescription(value="Writes the records from a FlowFile into to Elasticsearch, using the specified parameters such as the index to insert into and the type of the document, as well as the operation type (index, upsert, delete, etc.). Note: The Bulk API is used to send the records. This means that the entire contents of the incoming flow file are read into memory, and each record is transformed into a JSON document which is added to a single HTTP request body. For very large flow files (files with a large number of records, e.g.), this could cause memory usage issues.")
@WritesAttributes(value={@WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile. This is only populated on the 'success' relationship."), @WritesAttribute(attribute="failure.count", description="The number of records found by Elasticsearch to have errors. This is only populated on the 'failure' relationship.")})
@DynamicProperty(name="A URL query parameter", value="The value to set it to", expressionLanguageScope=ExpressionLanguageScope.VARIABLE_REGISTRY, description="Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
public class PutElasticsearchHttpRecord
extends AbstractElasticsearchHttpProcessor {
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("put-es-record-record-reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("put-es-record-record-writer").displayName("Record Writer").description("After sending a batch of records, Elasticsearch will report if individual records failed to insert. As an example, this can happen if the record doesn't match the mappingfor the index it is being inserted into. This property specifies the Controller Service to use for writing out those individual records sent to 'failure'. If this is not set, then the whole FlowFile will be routed to failure (including any records which may have been inserted successfully). Note that this will only be used if Elasticsearch reports that individual records failed and that in the event that the entire FlowFile fails (e.g. in the event ES is down), the FF will be routed to failure without being interpreted by this record writer. If there is an error while attempting to route the failures, the entire FlowFile will be routed to Failure. Also if every record failed individually, the entire FlowFile will be routed to Failure without being parsed by the writer.").identifiesControllerService(RecordSetWriterFactory.class).required(false).build();
    static final PropertyDescriptor LOG_ALL_ERRORS = new PropertyDescriptor.Builder().name("put-es-record-log-all-errors").displayName("Log all errors in batch").description("After sending a batch of records, Elasticsearch will report if individual records failed to insert. As an example, this can happen if the record doesn't match the mapping for the index it is being inserted into. If this is set to true, the processor will log the failure reason for the every failed record. When set to false only the first error in the batch will be logged.").addValidator(StandardValidators.BOOLEAN_VALIDATOR).required(false).defaultValue("false").allowableValues(new String[]{"true", "false"}).build();
    static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-id-path").displayName("Identifier Record Path").description("A RecordPath pointing to a field in the record(s) that contains the identifier for the document. If the Index Operation is \"index\" or \"create\", this property may be left empty or evaluate to an empty value, in which case the document's identifier will be auto-generated by Elasticsearch. For all other Index Operations, the field's value must be non-empty.").required(false).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("put-es-record-index").displayName("Index").description("The name of the index to insert into").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createAttributeExpressionLanguageValidator((AttributeExpression.ResultType)AttributeExpression.ResultType.STRING, (boolean)true)).build();
    static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("put-es-record-type").displayName("Type").description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). This must be unset or '_doc' for Elasticsearch 7.0+.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder().name("put-es-record-index-op").displayName("Index Operation").description("The type of the operation used to index (create, index, update, upsert, delete)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).defaultValue("index").build();
    static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress", "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
    static final AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress", "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
    static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values", "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
    static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder().name("suppress-nulls").displayName("Suppress Null Values").description("Specifies how the writer should handle a null field").allowableValues(new AllowableValue[]{NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING}).defaultValue(NEVER_SUPPRESS.getValue()).required(true).build();
    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp").displayName("@timestamp Value").description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-path").displayName("@timestamp Record Path").description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. If left blank the @timestamp will be determined using the main @timestamp property").required(false).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder().name("Date Format").description("Specifies the format to use when reading/writing Date fields. If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator((Validator)new SimpleDateFormatValidator()).required(false).build();
    static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder().name("Time Format").description("Specifies the format to use when reading/writing Time fields. If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator((Validator)new SimpleDateFormatValidator()).required(false).build();
    static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder().name("Timestamp Format").description("Specifies the format to use when reading/writing Timestamp fields. If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator((Validator)new SimpleDateFormatValidator()).required(false).build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;
    private volatile RecordPathCache recordPathCache;
    private final JsonFactory factory = new JsonFactory();
    private volatile String nullSuppression;
    private volatile String dateFormat;
    private volatile String timeFormat;
    private volatile String timestampFormat;
    private volatile Boolean logAllErrors;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override
    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(super.customValidate(validationContext));
        String idPath = validationContext.getProperty(ID_RECORD_PATH).getValue();
        String indexOp = validationContext.getProperty(INDEX_OP).getValue();
        if (StringUtils.isEmpty((String)idPath)) {
            switch (indexOp.toLowerCase()) {
                case "update": 
                case "upsert": 
                case "delete": 
                case "": {
                    problems.add(new ValidationResult.Builder().valid(false).subject(INDEX_OP.getDisplayName()).explanation("If Identifier Record Path is not set, Index Operation must evaluate to one of \"index\" or \"create\"").build());
                    break;
                }
            }
        }
        return problems;
    }

    @Override
    @OnScheduled
    public void setup(ProcessContext context) {
        super.setup(context);
        this.recordPathCache = new RecordPathCache(10);
        this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.dateFormat == null) {
            this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
        }
        this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.timeFormat == null) {
            this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
        }
        this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.timestampFormat == null) {
            this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
        }
        this.logAllErrors = context.getProperty(LOG_ALL_ERRORS).asBoolean();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        HashSet<Integer> failures;
        int recordCount;
        Optional<Object> writerFactoryOptional;
        RecordReaderFactory readerFactory;
        FlowFile flowFile;
        block100: {
            Response getResponse;
            flowFile = session.get();
            if (flowFile == null) {
                return;
            }
            readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
            writerFactoryOptional = context.getProperty(RECORD_WRITER).isSet() ? Optional.of((RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class)) : Optional.empty();
            String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
            String password = context.getProperty(PASSWORD).evaluateAttributeExpressions(flowFile).getValue();
            OkHttpClient okHttpClient = this.getClient();
            ComponentLog logger = this.getLogger();
            String baseUrl = context.getProperty(ES_URL).evaluateAttributeExpressions().getValue().trim();
            if (StringUtils.isEmpty((String)baseUrl)) {
                throw new ProcessException("Elasticsearch URL is empty or null, this indicates an invalid Expression (missing variables, e.g.)");
            }
            HttpUrl.Builder urlBuilder = Objects.requireNonNull(HttpUrl.parse((String)baseUrl)).newBuilder().addPathSegment("_bulk");
            for (Map.Entry property : context.getProperties().entrySet()) {
                PropertyDescriptor pd = (PropertyDescriptor)property.getKey();
                if (!pd.isDynamic() || property.getValue() == null) continue;
                urlBuilder = urlBuilder.addQueryParameter(pd.getName(), context.getProperty(pd).evaluateAttributeExpressions().getValue());
            }
            URL url = urlBuilder.build().url();
            String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isEmpty((String)index)) {
                logger.error("No value for index in for {}, transferring to failure", new Object[]{flowFile});
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
            String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(flowFile).getValue();
            if (StringUtils.isEmpty((String)indexOp)) {
                logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{flowFile});
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            switch (indexOp.toLowerCase()) {
                case "create": 
                case "index": 
                case "update": 
                case "upsert": 
                case "delete": {
                    break;
                }
                default: {
                    logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, flowFile});
                    session.transfer(flowFile, REL_FAILURE);
                    return;
                }
            }
            this.nullSuppression = context.getProperty(SUPPRESS_NULLS).getValue();
            String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
            RecordPath recordPath = StringUtils.isEmpty((String)idPath) ? null : this.recordPathCache.getCompiled(idPath);
            StringBuilder sb = new StringBuilder();
            Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
            String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
            String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue();
            RecordPath atPath = StringUtils.isEmpty((String)atTimestampPath) ? null : this.recordPathCache.getCompiled(atTimestampPath);
            recordCount = 0;
            try (InputStream in = session.read(flowFile);
                 RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
                Record record;
                while ((record = reader.nextRecord()) != null) {
                    Optional atPathValue;
                    String id;
                    if (recordPath != null) {
                        Optional idPathValue = recordPath.evaluate(record).getSelectedFields().findFirst();
                        if (!idPathValue.isPresent()) throw new IdentifierNotFoundException("Identifier Record Path specified but no value was found, transferring {} to failure.");
                        if (((FieldValue)idPathValue.get()).getValue() == null) {
                            throw new IdentifierNotFoundException("Identifier Record Path specified but no value was found, transferring {} to failure.");
                        }
                        id = ((FieldValue)idPathValue.get()).getValue().toString();
                    } else {
                        id = null;
                    }
                    String timestamp = atPath != null ? (!(atPathValue = atPath.evaluate(record).getSelectedFields().findFirst()).isPresent() || ((FieldValue)atPathValue.get()).getValue() == null ? atTimestamp : atPathValue.get()) : atTimestamp;
                    if (id == null && !indexOp.equalsIgnoreCase("index") && !indexOp.equalsIgnoreCase("create")) {
                        throw new IdentifierNotFoundException("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.");
                    }
                    StringBuilder json = new StringBuilder();
                    ByteArrayOutputStream out = new ByteArrayOutputStream();
                    JsonGenerator generator = this.factory.createGenerator((OutputStream)out);
                    this.writeRecord(record, generator, timestamp);
                    generator.flush();
                    generator.close();
                    json.append(out.toString(charset.name()));
                    this.buildBulkCommand(sb, index, docType, indexOp, id, json.toString());
                    ++recordCount;
                }
            }
            catch (IdentifierNotFoundException infe) {
                logger.error(infe.getMessage(), new Object[]{flowFile});
                flowFile = session.penalize(flowFile);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
                logger.error("Could not parse incoming data", e);
                flowFile = session.penalize(flowFile);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            RequestBody requestBody = RequestBody.create((String)sb.toString(), (MediaType)MediaType.parse((String)"application/json"));
            try {
                getResponse = this.sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody);
            }
            catch (Exception e) {
                logger.error("Routing to {} due to exception: {}", new Object[]{REL_FAILURE.getName(), e}, (Throwable)e);
                flowFile = session.penalize(flowFile);
                session.transfer(flowFile, REL_FAILURE);
                return;
            }
            int statusCode = getResponse.code();
            failures = new HashSet<Integer>();
            if (this.isSuccess(statusCode)) {
                try (ResponseBody responseBody = getResponse.body();){
                    if (responseBody == null) break block100;
                    byte[] bodyBytes = responseBody.bytes();
                    JsonNode responseJson = this.parseJsonResponse(new ByteArrayInputStream(bodyBytes));
                    boolean errors = responseJson.get("errors").asBoolean(false);
                    if (errors) {
                        ArrayNode itemNodeArray = (ArrayNode)responseJson.get("items");
                        if (itemNodeArray != null && itemNodeArray.size() > 0) {
                            String errorReason = null;
                            for (int i = itemNodeArray.size() - 1; i >= 0; --i) {
                                JsonNode itemNode = itemNodeArray.get(i);
                                int status = itemNode.findPath("status").asInt();
                                if (this.isSuccess(status)) continue;
                                if (errorReason == null || this.logAllErrors.booleanValue()) {
                                    String reason = itemNode.findPath("result").asText();
                                    if (StringUtils.isEmpty((String)reason)) {
                                        reason = itemNode.findPath("reason").asText();
                                    }
                                    errorReason = reason;
                                    logger.error("Failed to process record {} in FlowFile {} due to {}, transferring to failure", new Object[]{i, flowFile, errorReason});
                                }
                                failures.add(i);
                            }
                        }
                        break block100;
                    }
                    flowFile = session.putAttribute(flowFile, "record.count", Integer.toString(recordCount));
                    session.transfer(flowFile, REL_SUCCESS);
                    session.getProvenanceReporter().send(flowFile, url.toString());
                    return;
                }
                catch (IOException ioe) {
                    logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, (Throwable)ioe);
                    session.transfer(flowFile, REL_FAILURE);
                    context.yield();
                    return;
                }
                finally {
                    getResponse.close();
                }
            }
            if (statusCode / 100 == 5) {
                logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", new Object[]{statusCode, getResponse.message()});
                session.transfer(flowFile, REL_RETRY);
                context.yield();
                return;
            }
            logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        if (!failures.isEmpty() && failures.size() == recordCount || !writerFactoryOptional.isPresent()) {
            flowFile = session.putAttribute(flowFile, "failure.count", Integer.toString(failures.size()));
            session.transfer(flowFile, REL_FAILURE);
            return;
        }
        if (failures.isEmpty()) return;
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)writerFactoryOptional.get();
        FlowFile successFlowFile = session.create(flowFile);
        FlowFile failedFlowFile = session.create(flowFile);
        try (OutputStream successOut = session.write(successFlowFile);
             OutputStream failedOut = session.write(failedFlowFile);
             InputStream in = session.read(flowFile);
             RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
            RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), reader.getSchema());
            try (RecordSetWriter successWriter = writerFactory.createWriter(this.getLogger(), schema, successOut, successFlowFile);
                 RecordSetWriter failedWriter = writerFactory.createWriter(this.getLogger(), schema, failedOut, failedFlowFile);){
                Record record;
                successWriter.beginRecordSet();
                failedWriter.beginRecordSet();
                int i = 0;
                while ((record = reader.nextRecord(false, false)) != null) {
                    if (failures.contains(i)) {
                        failedWriter.write(record);
                    } else {
                        successWriter.write(record);
                    }
                    ++i;
                }
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            this.getLogger().error("Failed to process {} during individual record failure handling; route whole FF to failure", new Object[]{flowFile, e});
            session.transfer(flowFile, REL_FAILURE);
            if (successFlowFile != null) {
                session.remove(successFlowFile);
            }
            if (failedFlowFile == null) return;
            session.remove(failedFlowFile);
            return;
        }
        session.putAttribute(successFlowFile, "record.count", Integer.toString(recordCount - failures.size()));
        session.putAttribute(failedFlowFile, "record.count", Integer.toString(failures.size()));
        session.putAttribute(failedFlowFile, "failure.count", Integer.toString(failures.size()));
        session.transfer(successFlowFile, REL_SUCCESS);
        session.transfer(failedFlowFile, REL_FAILURE);
        session.remove(flowFile);
    }

    private void writeRecord(Record record, JsonGenerator generator, Object atTimestamp) throws IOException {
        RecordSchema schema = record.getSchema();
        generator.writeStartObject();
        if (!(atTimestamp == null || atTimestamp instanceof String && StringUtils.isBlank((String)((String)atTimestamp)))) {
            Object atValue;
            DataType atDataType;
            if (atTimestamp instanceof FieldValue) {
                FieldValue atField = (FieldValue)atTimestamp;
                atDataType = atField.getField().getDataType();
                atValue = atField.getValue();
            } else {
                atDataType = RecordFieldType.STRING.getDataType();
                atValue = atTimestamp.toString();
            }
            Object outputValue = RecordFieldType.STRING.getDataType().equals((Object)atDataType) ? this.coerceTimestampStringToLong(atValue.toString()) : atValue;
            DataType outputDataType = outputValue.equals(atValue) ? atDataType : RecordFieldType.LONG.getDataType();
            generator.writeFieldName("@timestamp");
            this.writeValue(generator, outputValue, "@timestamp", outputDataType);
        }
        for (int i = 0; i < schema.getFieldCount(); ++i) {
            RecordField field = schema.getField(i);
            String fieldName = field.getFieldName();
            Object value = record.getValue(field);
            if (value == null) {
                if (!this.nullSuppression.equals(NEVER_SUPPRESS.getValue()) && (!this.nullSuppression.equals(SUPPRESS_MISSING.getValue()) || !record.getRawFieldNames().contains(fieldName))) continue;
                generator.writeNullField(fieldName);
                continue;
            }
            generator.writeFieldName(fieldName);
            DataType dataType = (DataType)schema.getDataType(fieldName).get();
            this.writeValue(generator, value, fieldName, dataType);
        }
        generator.writeEndObject();
    }

    private Object coerceTimestampStringToLong(String stringValue) {
        return DataTypeUtils.isLongTypeCompatible((Object)stringValue) ? DataTypeUtils.toLong((Object)stringValue, (String)"@timestamp") : stringValue;
    }

    private void writeValue(JsonGenerator generator, Object value, String fieldName, DataType dataType) throws IOException {
        if (value == null) {
            if (this.nullSuppression.equals(NEVER_SUPPRESS.getValue()) || this.nullSuppression.equals(SUPPRESS_MISSING.getValue()) && fieldName != null && !fieldName.equals("")) {
                generator.writeNullField(fieldName);
            }
            return;
        }
        DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType((Object)value, (ChoiceDataType)((ChoiceDataType)dataType)) : dataType;
        Object coercedValue = DataTypeUtils.convertType((Object)value, (DataType)chosenDataType, (String)fieldName);
        if (coercedValue == null) {
            generator.writeNull();
            return;
        }
        switch (chosenDataType.getFieldType()) {
            case DATE: {
                String stringValue = DataTypeUtils.toString((Object)coercedValue, () -> DataTypeUtils.getDateFormat((String)this.dateFormat));
                if (DataTypeUtils.isLongTypeCompatible((Object)stringValue)) {
                    generator.writeNumber(DataTypeUtils.toLong((Object)coercedValue, (String)fieldName).longValue());
                    break;
                }
                generator.writeString(stringValue);
                break;
            }
            case TIME: {
                String stringValue = DataTypeUtils.toString((Object)coercedValue, () -> DataTypeUtils.getDateFormat((String)this.timeFormat));
                if (DataTypeUtils.isLongTypeCompatible((Object)stringValue)) {
                    generator.writeNumber(DataTypeUtils.toLong((Object)coercedValue, (String)fieldName).longValue());
                    break;
                }
                generator.writeString(stringValue);
                break;
            }
            case TIMESTAMP: {
                String stringValue = DataTypeUtils.toString((Object)coercedValue, () -> DataTypeUtils.getDateFormat((String)this.timestampFormat));
                if (DataTypeUtils.isLongTypeCompatible((Object)stringValue)) {
                    generator.writeNumber(DataTypeUtils.toLong((Object)coercedValue, (String)fieldName).longValue());
                    break;
                }
                generator.writeString(stringValue);
                break;
            }
            case DOUBLE: {
                generator.writeNumber(DataTypeUtils.toDouble((Object)coercedValue, (String)fieldName).doubleValue());
                break;
            }
            case FLOAT: {
                generator.writeNumber(DataTypeUtils.toFloat((Object)coercedValue, (String)fieldName).floatValue());
                break;
            }
            case LONG: {
                generator.writeNumber(DataTypeUtils.toLong((Object)coercedValue, (String)fieldName).longValue());
                break;
            }
            case INT: 
            case BYTE: 
            case SHORT: {
                generator.writeNumber(DataTypeUtils.toInteger((Object)coercedValue, (String)fieldName).intValue());
                break;
            }
            case CHAR: 
            case STRING: {
                generator.writeString(coercedValue.toString());
                break;
            }
            case BIGINT: {
                if (coercedValue instanceof Long) {
                    generator.writeNumber(((Long)coercedValue).longValue());
                    break;
                }
                generator.writeNumber((BigInteger)coercedValue);
                break;
            }
            case DECIMAL: {
                generator.writeNumber(DataTypeUtils.toBigDecimal((Object)coercedValue, (String)fieldName));
                break;
            }
            case BOOLEAN: {
                String stringValue = coercedValue.toString();
                if ("true".equalsIgnoreCase(stringValue)) {
                    generator.writeBoolean(true);
                    break;
                }
                if ("false".equalsIgnoreCase(stringValue)) {
                    generator.writeBoolean(false);
                    break;
                }
                generator.writeString(stringValue);
                break;
            }
            case RECORD: {
                this.writeRecord((Record)coercedValue, generator, null);
                break;
            }
            case MAP: {
                MapDataType mapDataType = (MapDataType)chosenDataType;
                DataType valueDataType = mapDataType.getValueType();
                Map map = (Map)coercedValue;
                generator.writeStartObject();
                for (Map.Entry entry : map.entrySet()) {
                    String mapKey = (String)entry.getKey();
                    Object mapValue = entry.getValue();
                    generator.writeFieldName(mapKey);
                    this.writeValue(generator, mapValue, fieldName + "." + mapKey, valueDataType);
                }
                generator.writeEndObject();
                break;
            }
            default: {
                if (coercedValue instanceof Object[]) {
                    Object[] values = (Object[])coercedValue;
                    ArrayDataType arrayDataType = (ArrayDataType)chosenDataType;
                    DataType elementType = arrayDataType.getElementType();
                    this.writeArray(values, fieldName, generator, elementType);
                    break;
                }
                generator.writeString(coercedValue.toString());
            }
        }
    }

    private void writeArray(Object[] values, String fieldName, JsonGenerator generator, DataType elementType) throws IOException {
        generator.writeStartArray();
        for (Object element : values) {
            this.writeValue(generator, element, fieldName, elementType);
        }
        generator.writeEndArray();
    }

    static {
        HashSet<Relationship> _rels = new HashSet<Relationship>();
        _rels.add(REL_SUCCESS);
        _rels.add(REL_FAILURE);
        _rels.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(_rels);
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(COMMON_PROPERTY_DESCRIPTORS);
        descriptors.add(RECORD_READER);
        descriptors.add(RECORD_WRITER);
        descriptors.add(LOG_ALL_ERRORS);
        descriptors.add(ID_RECORD_PATH);
        descriptors.add(AT_TIMESTAMP_RECORD_PATH);
        descriptors.add(AT_TIMESTAMP);
        descriptors.add(INDEX);
        descriptors.add(TYPE);
        descriptors.add(INDEX_OP);
        descriptors.add(SUPPRESS_NULLS);
        descriptors.add(DATE_FORMAT);
        descriptors.add(TIME_FORMAT);
        descriptors.add(TIMESTAMP_FORMAT);
        propertyDescriptors = Collections.unmodifiableList(descriptors);
    }
}

