/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.elasticsearch;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchException;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractPutElasticsearch;
import org.apache.nifi.processors.elasticsearch.api.BulkOperation;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleDateFormatValidator;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "elasticsearch7", "elasticsearch8", "put", "index", "record"})
@CapabilityDescription(value="A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.")
@WritesAttributes(value={@WritesAttribute(attribute="elasticsearch.put.error", description="The error message provided by Elasticsearch if there is an error indexing the documents."), @WritesAttribute(attribute="elasticsearch.put.error.count", description="The number of records that generated errors in the Elasticsearch _bulk API."), @WritesAttribute(attribute="elasticsearch.put.success.count", description="The number of records that were successfully processed by the Elasticsearch _bulk API.")})
@DynamicProperty(name="The name of a URL query parameter to add", value="The value of the URL query parameter", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the _bulk request body")
@SystemResourceConsideration(resource=SystemResource.MEMORY, description="The Batch of Records will be stored in memory until the bulk operation is performed.")
public class PutElasticsearchRecord
extends AbstractPutElasticsearch {
    static final Relationship REL_FAILED_RECORDS = new Relationship.Builder().name("errors").description("If a \"Result Record Writer\" is set, any Record(s) corresponding to Elasticsearch document(s) that resulted in an \"error\" (within Elasticsearch) will be routed here.").autoTerminateDefault(true).build();
    static final Relationship REL_SUCCESSFUL_RECORDS = new Relationship.Builder().name("successful_records").description("If a \"Result Record Writer\" is set, any Record(s) corresponding to Elasticsearch document(s) that did not result in an \"error\" (within Elasticsearch) will be routed here.").autoTerminateDefault(true).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("put-es-record-reader").displayName("Record Reader").description("The record reader to use for reading incoming records from flowfiles.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().fromPropertyDescriptor(AbstractPutElasticsearch.BATCH_SIZE).description("The number of records to send over in a single batch.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor AT_TIMESTAMP = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp").displayName("@timestamp Value").description("The value to use as the @timestamp field (required for Elasticsearch Data Streams)").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).required(false).build();
    static final PropertyDescriptor INDEX_OP_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-index-op-path").displayName("Index Operation Record Path").description("A record path expression to retrieve the Index Operation field for use with Elasticsearch. If left blank the Index Operation will be determined using the main Index Operation property.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-id-path").displayName("ID Record Path").description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank the ID will be automatically generated by Elasticsearch.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor RETAIN_ID_FIELD = new PropertyDescriptor.Builder().name("put-es-record-retain-id-field").displayName("Retain ID (Record Path)").description("Whether to retain the existing field used as the ID Record Path.").addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("false").required(false).dependsOn(ID_RECORD_PATH, new AllowableValue[0]).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor INDEX_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-index-record-path").displayName("Index Record Path").description("A record path expression to retrieve the index field for use with Elasticsearch. If left blank the index will be determined using the main index property.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor TYPE_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-type-record-path").displayName("Type Record Path").description("A record path expression to retrieve the type field for use with Elasticsearch. If left blank the type will be determined using the main type property.").addValidator((Validator)new RecordPathValidator()).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor AT_TIMESTAMP_RECORD_PATH = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-path").displayName("@timestamp Record Path").description("A RecordPath pointing to a field in the record(s) that contains the @timestamp for the document. If left blank the @timestamp will be determined using the main @timestamp property").addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor RETAIN_AT_TIMESTAMP_FIELD = new PropertyDescriptor.Builder().name("put-es-record-retain-at-timestamp-field").displayName("Retain @timestamp (Record Path)").description("Whether to retain the existing field used as the @timestamp Record Path.").addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("false").required(false).dependsOn(AT_TIMESTAMP_RECORD_PATH, new AllowableValue[0]).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor RESULT_RECORD_WRITER = new PropertyDescriptor.Builder().name("put-es-record-error-writer").displayName("Result Record Writer").description("If this configuration property is set, the response from Elasticsearch will be examined for failed records and the failed records will be written to a record set with this record writer service and sent to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. Successful records will be written to a record setwith this record writer service and sent to the \"" + REL_SUCCESSFUL_RECORDS.getName() + "\" relationship.").identifiesControllerService(RecordSetWriterFactory.class).addValidator(Validator.VALID).required(false).build();
    static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new PropertyDescriptor.Builder().name("put-es-record-not_found-is-error").displayName("Treat \"Not Found\" as Error").description("If true, \"not_found\" Elasticsearch Document associated Records will be routed to the \"" + REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship.").addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("true").required(false).dependsOn(RESULT_RECORD_WRITER, new AllowableValue[0]).build();
    static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-date-format").displayName("Date Format").description("Specifies the format to use when writing Date fields. If not specified, the default format '" + RecordFieldType.DATE.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/25/2017).").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator((Validator)new SimpleDateFormatValidator()).required(false).build();
    static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-time-format").displayName("Time Format").description("Specifies the format to use when writing Time fields. If not specified, the default format '" + RecordFieldType.TIME.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator((Validator)new SimpleDateFormatValidator()).required(false).build();
    static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder().name("put-es-record-at-timestamp-timestamp-format").displayName("Timestamp Format").description("Specifies the format to use when writing Timestamp fields. If not specified, the default format '" + RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/25/2017 18:04:15).").expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator((Validator)new SimpleDateFormatValidator()).required(false).build();
    static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD, INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD, DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL));
    static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<Relationship>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, REL_SUCCESSFUL_RECORDS)));
    private RecordPathCache recordPathCache;
    private RecordReaderFactory readerFactory;
    private RecordSetWriterFactory writerFactory;
    private volatile String dateFormat;
    private volatile String timeFormat;
    private volatile String timestampFormat;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        this.readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        this.recordPathCache = new RecordPathCache(16);
        this.writerFactory = (RecordSetWriterFactory)context.getProperty(RESULT_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        this.notFoundIsSuccessful = context.getProperty(NOT_FOUND_IS_SUCCESSFUL).asBoolean();
        this.dateFormat = context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.dateFormat == null) {
            this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
        }
        this.timeFormat = context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.timeFormat == null) {
            this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
        }
        this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
        if (this.timestampFormat == null) {
            this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        FlowFile input = session.get();
        if (input == null) {
            return;
        }
        String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(input).getValue();
        String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
        String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
        String atTimestamp = context.getProperty(AT_TIMESTAMP).evaluateAttributeExpressions(input).getValue();
        String indexOpPath = context.getProperty(INDEX_OP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
        String idPath = context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
        String indexPath = context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
        String typePath = context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
        String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
        RecordPath ioPath = indexOpPath != null ? this.recordPathCache.getCompiled(indexOpPath) : null;
        RecordPath path = StringUtils.isNotBlank((String)idPath) ? this.recordPathCache.getCompiled(idPath) : null;
        RecordPath iPath = indexPath != null ? this.recordPathCache.getCompiled(indexPath) : null;
        RecordPath tPath = typePath != null ? this.recordPathCache.getCompiled(typePath) : null;
        RecordPath atPath = StringUtils.isNotBlank((String)atTimestampPath) ? this.recordPathCache.getCompiled(atTimestampPath) : null;
        boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean();
        boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean();
        int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
        ArrayList<FlowFile> resultRecords = new ArrayList<FlowFile>();
        final AtomicLong erroredRecords = new AtomicLong(0L);
        final AtomicLong successfulRecords = new AtomicLong(0L);
        StopWatch stopWatch = new StopWatch(true);
        HashSet<String> indices = new HashSet<String>();
        HashSet<String> types = new HashSet<String>();
        int batches = 0;
        try (InputStream inStream = session.read(input);
             RecordReader reader = this.readerFactory.createRecordReader(input, inStream, this.getLogger());){
            Record record;
            PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
            ArrayList<IndexOperationRequest> operationList = new ArrayList<IndexOperationRequest>();
            ArrayList<Record> originals = new ArrayList<Record>();
            while ((record = recordSet.next()) != null) {
                String idx = this.getFromRecordPath(record, iPath, index, false);
                indices.add(idx);
                String t = this.getFromRecordPath(record, tPath, type, false);
                if (StringUtils.isNotBlank((String)t)) {
                    types.add(t);
                }
                IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue((String)this.getFromRecordPath(record, ioPath, indexOp, false));
                String id = this.getFromRecordPath(record, path, null, retainId);
                Object timestamp = this.getTimestampFromRecordPath(record, atPath, atTimestamp, retainTimestamp);
                Map contentMap = (Map)DataTypeUtils.convertRecordFieldtoObject((Object)record, (DataType)RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
                this.formatDateTimeFields(contentMap, record);
                contentMap.putIfAbsent("@timestamp", timestamp);
                operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
                originals.add(record);
                if (operationList.size() != batchSize && recordSet.isAnotherRecord()) continue;
                this.operate(operationList, originals, reader, context, session, input, resultRecords, erroredRecords, successfulRecords);
                ++batches;
            }
            if (!operationList.isEmpty()) {
                this.operate(operationList, originals, reader, context, session, input, resultRecords, erroredRecords, successfulRecords);
                ++batches;
            }
        }
        catch (ElasticsearchException ese) {
            String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", ese.isElastic() ? "Routing to retry." : "Routing to failure");
            this.getLogger().error(msg, (Throwable)ese);
            Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
            this.transferFlowFilesOnException((Exception)((Object)ese), rel, session, true, input);
            this.removeResultRecordFlowFiles(resultRecords, session);
            return;
        }
        catch (IOException | SchemaNotFoundException ex) {
            this.getLogger().warn("Could not log Elasticsearch operation errors nor determine which documents errored.", ex);
            this.transferFlowFilesOnException((Exception)ex, REL_FAILURE, session, true, input);
            this.removeResultRecordFlowFiles(resultRecords, session);
            return;
        }
        catch (Exception ex) {
            this.getLogger().error("Could not index documents.", (Throwable)ex);
            this.transferFlowFilesOnException(ex, REL_FAILURE, session, false, input);
            context.yield();
            this.removeResultRecordFlowFiles(resultRecords, session);
            return;
        }
        stopWatch.stop();
        session.getProvenanceReporter().send(input, ((ElasticSearchClientService)this.clientService.get()).getTransitUrl(String.join((CharSequence)",", indices), types.isEmpty() ? null : String.join((CharSequence)",", types)), String.format(Locale.getDefault(), "%d Elasticsearch _bulk operation batch(es) [%d error(s), %d success(es)]", batches, erroredRecords.get(), successfulRecords.get()), stopWatch.getDuration(TimeUnit.MILLISECONDS));
        input = session.putAllAttributes(input, (Map)new HashMap<String, String>(){
            {
                this.put("elasticsearch.put.error.count", String.valueOf(erroredRecords.get()));
                this.put("elasticsearch.put.success.count", String.valueOf(successfulRecords.get()));
            }
        });
        session.transfer(input, REL_SUCCESS);
    }

    private void operate(List<IndexOperationRequest> operationList, List<Record> originals, RecordReader reader, ProcessContext context, ProcessSession session, FlowFile input, List<FlowFile> resultRecords, AtomicLong erroredRecords, AtomicLong successfulRecords) throws IOException, SchemaNotFoundException, MalformedRecordException {
        BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
        ResponseDetails responseDetails = this.indexDocuments(bundle, context, session, input);
        if (responseDetails.getErrored() != null) {
            resultRecords.add(responseDetails.getErrored());
        }
        erroredRecords.getAndAdd(responseDetails.getErrorCount());
        if (responseDetails.getSucceeded() != null) {
            resultRecords.add(responseDetails.getSucceeded());
        }
        successfulRecords.getAndAdd(responseDetails.getSuccessCount());
        operationList.clear();
        originals.clear();
    }

    private void removeResultRecordFlowFiles(List<FlowFile> results, ProcessSession session) {
        for (FlowFile flowFile : results) {
            session.remove(flowFile);
        }
        results.clear();
    }

    private ResponseDetails indexDocuments(BulkOperation bundle, ProcessContext context, ProcessSession session, FlowFile input) throws IOException, SchemaNotFoundException {
        IndexOperationResponse response = ((ElasticSearchClientService)this.clientService.get()).bulk(bundle.getOperationList(), this.getUrlQueryParameters(context, input));
        ArrayList<Predicate<Map<String, Object>>> errorItemFilters = new ArrayList<Predicate<Map<String, Object>>>(2);
        if (response.hasErrors()) {
            this.logElasticsearchDocumentErrors(response);
            errorItemFilters.add(this.isElasticsearchError());
        }
        if (this.writerFactory != null && !this.notFoundIsSuccessful) {
            errorItemFilters.add(this.isElasticsearchNotFound());
        }
        List<Integer> errorIndices = this.findElasticsearchResponseIndices(response, errorItemFilters.toArray(new Predicate[0]));
        int numErrors = errorIndices.size();
        int numSuccessful = response.getItems() == null ? 0 : response.getItems().size() - numErrors;
        FlowFile errorFF = null;
        FlowFile successFF = null;
        if (this.writerFactory != null) {
            try {
                successFF = session.create(input);
                errorFF = session.create(input);
                try (OutputStream errorOutputStream = session.write(errorFF);
                     RecordSetWriter errorWriter = this.writerFactory.createWriter(this.getLogger(), bundle.getSchema(), errorOutputStream, errorFF);
                     OutputStream successOutputStream = session.write(successFF);
                     RecordSetWriter successWriter = this.writerFactory.createWriter(this.getLogger(), bundle.getSchema(), successOutputStream, successFF);){
                    errorWriter.beginRecordSet();
                    successWriter.beginRecordSet();
                    for (int o = 0; o < bundle.getOriginalRecords().size(); ++o) {
                        if (errorIndices.contains(o)) {
                            errorWriter.write(bundle.getOriginalRecords().get(o));
                            continue;
                        }
                        successWriter.write(bundle.getOriginalRecords().get(o));
                    }
                    errorWriter.finishRecordSet();
                    successWriter.finishRecordSet();
                }
                if (numErrors > 0) {
                    errorFF = session.putAttribute(errorFF, "record.count", String.valueOf(numErrors));
                    session.transfer(errorFF, REL_FAILED_RECORDS);
                } else {
                    session.remove(errorFF);
                }
                if (numSuccessful > 0) {
                    successFF = session.putAttribute(successFF, "record.count", String.valueOf(numSuccessful));
                    session.transfer(successFF, REL_SUCCESSFUL_RECORDS);
                } else {
                    session.remove(successFF);
                }
            }
            catch (IOException | SchemaNotFoundException ex) {
                this.getLogger().error("Unable to write error/successful records", ex);
                session.remove(errorFF);
                session.remove(successFF);
                throw ex;
            }
        }
        return new ResponseDetails(successFF, numSuccessful, errorFF, numErrors);
    }

    private void formatDateTimeFields(Map<String, Object> contentMap, Record record) {
        for (RecordField recordField : record.getSchema().getFields()) {
            DataType chosenDataType;
            String format;
            Object value = contentMap.get(recordField.getFieldName());
            if (value == null || (format = this.determineDateFormat((chosenDataType = recordField.getDataType().getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType((Object)record.getValue(recordField), (ChoiceDataType)((ChoiceDataType)recordField.getDataType())) : recordField.getDataType()).getFieldType())) == null) continue;
            Object formattedValue = this.coerceStringToLong(recordField.getFieldName(), DataTypeUtils.toString((Object)value, () -> DataTypeUtils.getDateFormat((String)format)));
            contentMap.put(recordField.getFieldName(), formattedValue);
        }
    }

    private String getFromRecordPath(Record record, RecordPath path, String fallback, boolean retain) {
        if (path == null) {
            return fallback;
        }
        RecordPathResult result = path.evaluate(record);
        Optional value = result.getSelectedFields().findFirst();
        if (value.isPresent() && ((FieldValue)value.get()).getValue() != null) {
            FieldValue fieldValue = (FieldValue)value.get();
            if (!fieldValue.getField().getDataType().getFieldType().equals((Object)RecordFieldType.STRING)) {
                throw new ProcessException(String.format("Field referenced by %s must be a string.", path.getPath()));
            }
            if (!retain) {
                fieldValue.updateValue(null);
            }
            return fieldValue.toString();
        }
        return fallback;
    }

    private Object getTimestampFromRecordPath(Record record, RecordPath path, String fallback, boolean retain) {
        if (path == null) {
            return this.coerceStringToLong("@timestamp", fallback);
        }
        RecordPathResult result = path.evaluate(record);
        Optional value = result.getSelectedFields().findFirst();
        if (value.isPresent() && ((FieldValue)value.get()).getValue() != null) {
            Object returnValue;
            FieldValue fieldValue = (FieldValue)value.get();
            DataType dataType = fieldValue.getField().getDataType();
            String fieldName = fieldValue.getField().getFieldName();
            DataType chosenDataType = dataType.getFieldType() == RecordFieldType.CHOICE ? DataTypeUtils.chooseDataType(value, (ChoiceDataType)((ChoiceDataType)dataType)) : dataType;
            Object coercedValue = DataTypeUtils.convertType((Object)fieldValue.getValue(), (DataType)chosenDataType, (String)fieldName);
            if (coercedValue == null) {
                return null;
            }
            switch (chosenDataType.getFieldType()) {
                case DATE: 
                case TIME: 
                case TIMESTAMP: {
                    String format = this.determineDateFormat(chosenDataType.getFieldType());
                    returnValue = this.coerceStringToLong(fieldName, DataTypeUtils.toString((Object)coercedValue, () -> DataTypeUtils.getDateFormat((String)format)));
                    break;
                }
                case LONG: {
                    returnValue = DataTypeUtils.toLong((Object)coercedValue, (String)fieldName);
                    break;
                }
                case INT: 
                case BYTE: 
                case SHORT: {
                    returnValue = DataTypeUtils.toInteger((Object)coercedValue, (String)fieldName);
                    break;
                }
                case CHAR: 
                case STRING: {
                    returnValue = this.coerceStringToLong(fieldName, coercedValue.toString());
                    break;
                }
                case BIGINT: {
                    returnValue = coercedValue;
                    break;
                }
                default: {
                    throw new ProcessException(String.format("Cannot use %s field referenced by %s as @timestamp.", chosenDataType, path.getPath()));
                }
            }
            if (!retain) {
                fieldValue.updateValue(null);
            }
            return returnValue;
        }
        return this.coerceStringToLong("@timestamp", fallback);
    }

    private String determineDateFormat(RecordFieldType recordFieldType) {
        String format;
        switch (recordFieldType) {
            case DATE: {
                format = this.dateFormat;
                break;
            }
            case TIME: {
                format = this.timeFormat;
                break;
            }
            case TIMESTAMP: {
                format = this.timestampFormat;
                break;
            }
            default: {
                format = null;
            }
        }
        return format;
    }

    private Object coerceStringToLong(String fieldName, String stringValue) {
        return DataTypeUtils.isLongTypeCompatible((Object)stringValue) ? DataTypeUtils.toLong((Object)stringValue, (String)fieldName) : stringValue;
    }

    private static class ResponseDetails {
        final FlowFile errored;
        final FlowFile succeeded;
        final int errorCount;
        final int successCount;

        ResponseDetails(FlowFile succeeded, int successCount, FlowFile errored, int errorCount) {
            this.succeeded = succeeded;
            this.successCount = successCount;
            this.errored = errored;
            this.errorCount = errorCount;
        }

        public FlowFile getSucceeded() {
            return this.succeeded;
        }

        public FlowFile getErrored() {
            return this.errored;
        }

        public int getErrorCount() {
            return this.errorCount;
        }

        public int getSuccessCount() {
            return this.successCount;
        }
    }
}

