/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.ErrorTypes;
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
import org.apache.nifi.processor.util.pattern.PartialFunctions;
import org.apache.nifi.processor.util.pattern.PutGroup;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.processors.standard.ConvertJSONToSQL;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.db.JdbcCommon;

@SupportsBatching
@SeeAlso(value={ConvertJSONToSQL.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@Tags(value={"sql", "put", "rdbms", "database", "update", "insert", "relational"})
@CapabilityDescription(value="Executes a SQL UPDATE or INSERT command. The content of an incoming FlowFile is expected to be the SQL command to execute. The SQL command may use the ? to escape parameters. In this case, the parameters to use must exist as FlowFile attributes with the naming convention sql.args.N.type and sql.args.N.value, where N is a positive integer. The sql.args.N.type is expected to be a number indicating the JDBC Type. The content of the FlowFile is expected to be in UTF-8 format.")
@ReadsAttributes(value={@ReadsAttribute(attribute="fragment.identifier", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine whether or not two FlowFiles belong to the same transaction."), @ReadsAttribute(attribute="fragment.count", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine how many FlowFiles are needed to complete the transaction."), @ReadsAttribute(attribute="fragment.index", description="If the <Support Fragment Transactions> property is true, this attribute is used to determine the order that the FlowFiles in a transaction should be evaluated."), @ReadsAttribute(attribute="sql.args.N.type", description="Incoming FlowFiles are expected to be parametrized SQL statements. The type of each Parameter is specified as an integer that represents the JDBC Type of the parameter."), @ReadsAttribute(attribute="sql.args.N.value", description="Incoming FlowFiles are expected to be parametrized SQL statements. The value of the Parameters are specified as sql.args.1.value, sql.args.2.value, sql.args.3.value, and so on. The type of the sql.args.1.value Parameter is specified by the sql.args.1.type attribute."), @ReadsAttribute(attribute="sql.args.N.format", description="This attribute is always optional, but default options may not always work for your data. Incoming FlowFiles are expected to be parametrized SQL statements. In some cases a format option needs to be specified, currently this is only applicable for binary data types, dates, times and timestamps. Binary Data Types (defaults to 'ascii') - ascii: each string character in your attribute value represents a single byte. This is the format provided by Avro Processors. base64: the string is a Base64 encoded string that can be decoded to bytes. hex: the string is hex encoded with all letters in upper case and no '0x' at the beginning. Dates/Times/Timestamps - Date, Time and Timestamp formats all support both custom formats or named format ('yyyy-MM-dd','ISO_OFFSET_DATE_TIME') as specified according to java.time.format.DateTimeFormatter. If not specified, a long value input is expected to be an unix epoch (milli seconds from 1970/1/1), or a string value in 'yyyy-MM-dd' format for Date, 'HH:mm:ss.SSS' for Time (some database engines e.g. Derby or MySQL do not support milliseconds and will truncate milliseconds), 'yyyy-MM-dd HH:mm:ss.SSS' for Timestamp is used.")})
@WritesAttributes(value={@WritesAttribute(attribute="sql.generated.key", description="If the database generated a key for an INSERT statement and the Obtain Generated Keys property is set to true, this attribute will be added to indicate the generated key, if possible. This feature is not supported by all database vendors.")})
public class PutSQL
extends AbstractSessionFactoryProcessor {
    static final PropertyDescriptor CONNECTION_POOL = new PropertyDescriptor.Builder().name("JDBC Connection Pool").description("Specifies the JDBC Connection Pool to use in order to convert the JSON message to a SQL statement. The Connection Pool is necessary in order to determine the appropriate database column types.").identifiesControllerService(DBCPService.class).required(true).build();
    static final PropertyDescriptor SQL_STATEMENT = new PropertyDescriptor.Builder().name("putsql-sql-statement").displayName("SQL Statement").description("The SQL statement to execute. The statement can be empty, a constant value, or built from attributes using Expression Language. If this property is specified, it will be used regardless of the content of incoming FlowFiles. If this property is empty, the content of the incoming FlowFile is expected to contain a valid SQL statement, to be issued by the processor to the database.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder().name("database-session-autocommit").displayName("Database Session AutoCommit").description("The autocommit mode to set on the database connection being used. If set to false, the operation(s) will be explicitly committed or rolled back (based on success or failure respectively), if set to true the driver/database handles the commit/rollback.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final PropertyDescriptor SUPPORT_TRANSACTIONS = new PropertyDescriptor.Builder().name("Support Fragmented Transactions").description("If true, when a FlowFile is consumed by this Processor, the Processor will first check the fragment.identifier and fragment.count attributes of that FlowFile. If the fragment.count value is greater than 1, the Processor will not process any FlowFile with that fragment.identifier until all are available; at that point, it will process all FlowFiles with that fragment.identifier as a single transaction, in the order specified by the FlowFiles' fragment.index attributes. This Provides atomicity of those SQL statements. Once any statement of this transaction throws exception when executing, this transaction will be rolled back. When transaction rollback happened, none of these FlowFiles would be routed to 'success'. If the <Rollback On Failure> is set true, these FlowFiles will stay in the input relationship. When the <Rollback On Failure> is set false,, if any of these FlowFiles will be routed to 'retry', all of these FlowFiles will be routed to 'retry'.Otherwise, they will be routed to 'failure'. If this value is false, these attributes will be ignored and the updates will occur independent of one another.").allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    static final PropertyDescriptor TRANSACTION_TIMEOUT = new PropertyDescriptor.Builder().name("Transaction Timeout").description("If the <Support Fragmented Transactions> property is set to true, specifies how long to wait for all FlowFiles for a particular fragment.identifier attribute to arrive before just transferring all of the FlowFiles with that identifier to the 'failure' relationship").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("The preferred number of FlowFiles to put to the database in a single transaction").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("100").build();
    static final PropertyDescriptor OBTAIN_GENERATED_KEYS = new PropertyDescriptor.Builder().name("Obtain Generated Keys").description("If true, any key that is automatically generated by the database will be added to the FlowFile that generated it using the sql.generate.key attribute. This may result in slightly slower performance and is not supported by all databases.").allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile is routed to this relationship after the database is successfully updated").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile is routed to this relationship if the database cannot be updated and retrying the operation will also fail, such as an invalid query or an integrity constraint violation").build();
    private static final String FRAGMENT_ID_ATTR = FragmentAttributes.FRAGMENT_ID.key();
    private static final String FRAGMENT_INDEX_ATTR = FragmentAttributes.FRAGMENT_INDEX.key();
    private static final String FRAGMENT_COUNT_ATTR = FragmentAttributes.FRAGMENT_COUNT.key();
    private static final String ERROR_MESSAGE_ATTR = "error.message";
    private static final String ERROR_CODE_ATTR = "error.code";
    private static final String ERROR_SQL_STATE_ATTR = "error.sql.state";
    private PutGroup<FunctionContext, Connection, StatementFlowFileEnclosure> process;
    private BiFunction<FunctionContext, ErrorTypes, ErrorTypes.Result> adjustError;
    private ExceptionHandler<FunctionContext> exceptionHandler;
    private final PartialFunctions.FetchFlowFiles<FunctionContext> fetchFlowFiles = (c, s, fc, r) -> {
        FlowFilePoll poll = this.pollFlowFiles(c, s, (FunctionContext)((Object)fc), r);
        if (poll == null) {
            return null;
        }
        fc.fragmentedTransaction = poll.isFragmentedTransaction();
        return poll.getFlowFiles();
    };
    private final PartialFunctions.InitConnection<FunctionContext, Connection> initConnection = (c, s, fc, ffs) -> {
        Connection connection = ((DBCPService)c.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)).getConnection(ffs == null || ffs.isEmpty() ? Collections.emptyMap() : ((FlowFile)ffs.get(0)).getAttributes());
        try {
            fc.originalAutoCommit = connection.getAutoCommit();
            boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
            if (fc.originalAutoCommit != autocommit) {
                try {
                    connection.setAutoCommit(autocommit);
                }
                catch (SQLFeatureNotSupportedException sfnse) {
                    this.getLogger().debug("setAutoCommit({}) not supported by this driver", new Object[]{autocommit});
                }
            }
        }
        catch (SQLException e) {
            throw new ProcessException("Failed to disable auto commit due to " + e, (Throwable)e);
        }
        return connection;
    };
    private final GroupingFunction groupFragmentedTransaction = (context, session, fc, conn, flowFiles, groups, result) -> {
        FragmentedEnclosure fragmentedEnclosure = new FragmentedEnclosure();
        groups.add(fragmentedEnclosure);
        HashMap<String, StatementFlowFileEnclosure> sqlToEnclosure = new HashMap<String, StatementFlowFileEnclosure>();
        for (FlowFile flowFile : flowFiles) {
            String sql = context.getProperty(SQL_STATEMENT).isSet() ? context.getProperty(SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() : this.getSQL(session, flowFile);
            StatementFlowFileEnclosure enclosure = sqlToEnclosure.computeIfAbsent(sql, k -> new StatementFlowFileEnclosure(sql));
            fragmentedEnclosure.addFlowFile(flowFile, enclosure);
        }
    };
    private final GroupingFunction groupFlowFilesBySQLBatch = (context, session, fc, conn, flowFiles, groups, result) -> {
        for (FlowFile flowFile : flowFiles) {
            StatementFlowFileEnclosure enclosure;
            StatementFlowFileEnclosure lastEnclosure;
            String sql = context.getProperty(SQL_STATEMENT).isSet() ? context.getProperty(SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() : this.getSQL(session, flowFile);
            StatementFlowFileEnclosure statementFlowFileEnclosure = lastEnclosure = groups.isEmpty() ? null : (StatementFlowFileEnclosure)groups.get(groups.size() - 1);
            if (lastEnclosure == null || !lastEnclosure.getSql().equals(sql)) {
                enclosure = new StatementFlowFileEnclosure(sql);
                groups.add(enclosure);
            } else {
                enclosure = lastEnclosure;
            }
            if (!this.exceptionHandler.execute((Object)fc, (Object)flowFile, input -> {
                PreparedStatement stmt = enclosure.getCachedStatement(conn);
                JdbcCommon.setParameters((PreparedStatement)stmt, (Map)flowFile.getAttributes());
                stmt.addBatch();
            }, this.onFlowFileError(context, session, result))) continue;
            enclosure.addFlowFile(flowFile);
        }
    };
    private final GroupingFunction groupFlowFilesBySQL = (context, session, fc, conn, flowFiles, groups, result) -> {
        for (FlowFile flowFile : flowFiles) {
            StatementFlowFileEnclosure enclosure;
            StatementFlowFileEnclosure lastEnclosure;
            String sql = context.getProperty(SQL_STATEMENT).isSet() ? context.getProperty(SQL_STATEMENT).evaluateAttributeExpressions(flowFile).getValue() : this.getSQL(session, flowFile);
            StatementFlowFileEnclosure statementFlowFileEnclosure = lastEnclosure = groups.isEmpty() ? null : (StatementFlowFileEnclosure)groups.get(groups.size() - 1);
            if (lastEnclosure == null || !lastEnclosure.getSql().equals(sql)) {
                enclosure = new StatementFlowFileEnclosure(sql);
                groups.add(enclosure);
            } else {
                enclosure = lastEnclosure;
            }
            enclosure.addFlowFile(flowFile);
        }
    };
    final PutGroup.GroupFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> groupFlowFiles = (context, session, fc, conn, flowFiles, result) -> {
        ArrayList<StatementFlowFileEnclosure> groups = new ArrayList<StatementFlowFileEnclosure>();
        if (fc.obtainKeys) {
            this.groupFlowFilesBySQL.apply(context, session, (FunctionContext)((Object)fc), (Connection)conn, flowFiles, (List<StatementFlowFileEnclosure>)groups, result);
        } else if (fc.fragmentedTransaction) {
            this.groupFragmentedTransaction.apply(context, session, (FunctionContext)((Object)fc), (Connection)conn, flowFiles, (List<StatementFlowFileEnclosure>)groups, result);
        } else {
            this.groupFlowFilesBySQLBatch.apply(context, session, (FunctionContext)((Object)fc), (Connection)conn, flowFiles, (List<StatementFlowFileEnclosure>)groups, result);
        }
        return groups;
    };
    final PutGroup.PutFlowFiles<FunctionContext, Connection, StatementFlowFileEnclosure> putFlowFiles = (context, session, fc, conn, enclosure, result) -> {
        ArrayList sentFlowFiles = new ArrayList();
        if (fc.isSupportBatching()) {
            this.exceptionHandler.execute((Object)fc, enclosure, input -> {
                try (PreparedStatement stmt = enclosure.getCachedStatement((Connection)conn);){
                    stmt.executeBatch();
                    sentFlowFiles.addAll(enclosure.getFlowFiles());
                    result.routeTo(enclosure.getFlowFiles(), REL_SUCCESS);
                }
            }, this.onBatchUpdateError(context, session, result));
        } else {
            for (FlowFile flowFile : enclosure.getFlowFiles()) {
                StatementFlowFileEnclosure targetEnclosure = enclosure instanceof FragmentedEnclosure ? ((FragmentedEnclosure)enclosure).getTargetEnclosure(flowFile) : enclosure;
                this.exceptionHandler.execute((Object)fc, (Object)flowFile, input -> {
                    try (PreparedStatement stmt = targetEnclosure.getNewStatement((Connection)conn, fc.obtainKeys);){
                        JdbcCommon.setParameters((PreparedStatement)stmt, (Map)flowFile.getAttributes());
                        stmt.executeUpdate();
                        FlowFile sentFlowFile = flowFile;
                        String generatedKey = this.determineGeneratedKey(stmt);
                        if (generatedKey != null) {
                            sentFlowFile = session.putAttribute(sentFlowFile, "sql.generated.key", generatedKey);
                        }
                        sentFlowFiles.add(sentFlowFile);
                        result.routeTo(sentFlowFile, REL_SUCCESS);
                    }
                }, this.onFlowFileError(context, session, result));
            }
        }
        if (!sentFlowFiles.isEmpty()) {
            String url = "jdbc://unknown-host";
            try {
                url = conn.getMetaData().getURL();
            }
            catch (SQLException flowFile) {
                // empty catch block
            }
            long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos);
            for (FlowFile flowFile : sentFlowFiles) {
                session.getProvenanceReporter().send(flowFile, url, transmissionMillis, true);
            }
        }
    };

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(CONNECTION_POOL);
        properties.add(SQL_STATEMENT);
        properties.add(SUPPORT_TRANSACTIONS);
        properties.add(AUTO_COMMIT);
        properties.add(TRANSACTION_TIMEOUT);
        properties.add(BATCH_SIZE);
        properties.add(OBTAIN_GENERATED_KEYS);
        properties.add(RollbackOnFailure.ROLLBACK_ON_FAILURE);
        return properties;
    }

    protected final Collection<ValidationResult> customValidate(ValidationContext context) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String support_transactions = context.getProperty(SUPPORT_TRANSACTIONS).getValue();
        String rollback_on_failure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).getValue();
        String auto_commit = context.getProperty(AUTO_COMMIT).getValue();
        if (auto_commit.equalsIgnoreCase("true")) {
            if (support_transactions.equalsIgnoreCase("true")) {
                results.add(new ValidationResult.Builder().subject(SUPPORT_TRANSACTIONS.getDisplayName()).explanation(String.format("'%s' cannot be set to 'true' when '%s' is also set to 'true'.Transactions for batch updates cannot be supported when auto commit is set to 'true'", SUPPORT_TRANSACTIONS.getDisplayName(), AUTO_COMMIT.getDisplayName())).build());
            }
            if (rollback_on_failure.equalsIgnoreCase("true")) {
                results.add(new ValidationResult.Builder().subject(RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName()).explanation(String.format("'%s' cannot be set to 'true' when '%s' is also set to 'true'.Transaction rollbacks for batch updates cannot be supported when auto commit is set to 'true'", RollbackOnFailure.ROLLBACK_ON_FAILURE.getDisplayName(), AUTO_COMMIT.getDisplayName())).build());
            }
        }
        return results;
    }

    public Set<Relationship> getRelationships() {
        HashSet<Relationship> rels = new HashSet<Relationship>();
        rels.add(REL_SUCCESS);
        rels.add(REL_RETRY);
        rels.add(REL_FAILURE);
        return rels;
    }

    private ExceptionHandler.OnError<FunctionContext, FlowFile> onFlowFileError(ProcessContext context, ProcessSession session, RoutingResult result) {
        ExceptionHandler.OnError onFlowFileError = ExceptionHandler.createOnError((ProcessContext)context, (ProcessSession)session, (RoutingResult)result, (Relationship)REL_FAILURE, (Relationship)REL_RETRY);
        onFlowFileError = onFlowFileError.andThen((ctx, flowFile, errorTypesResult, exception) -> {
            switch (errorTypesResult.destination()) {
                case Failure: {
                    this.getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[]{flowFile, exception, exception});
                    this.addErrorAttributesToFlowFile(session, (FlowFile)flowFile, exception);
                    break;
                }
                case Retry: {
                    this.getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{flowFile, exception, exception});
                    this.addErrorAttributesToFlowFile(session, (FlowFile)flowFile, exception);
                    break;
                }
                case Self: {
                    this.getLogger().error("Failed to update database for {} due to {};", new Object[]{flowFile, exception, exception});
                }
            }
        });
        return RollbackOnFailure.createOnError((ExceptionHandler.OnError)onFlowFileError);
    }

    private ExceptionHandler.OnError<RollbackOnFailure, PartialFunctions.FlowFileGroup> onGroupError(ProcessContext context, ProcessSession session, RoutingResult result) {
        ExceptionHandler.OnError onGroupError = ExceptionHandler.createOnGroupError((ProcessContext)context, (ProcessSession)session, (RoutingResult)result, (Relationship)REL_FAILURE, (Relationship)REL_RETRY);
        onGroupError = onGroupError.andThen((ctx, flowFileGroup, errorTypesResult, exception) -> {
            switch (errorTypesResult.destination()) {
                case Failure: {
                    List<FlowFile> flowFilesToFailure = this.getFlowFilesOnRelationship(result, REL_FAILURE);
                    result.getRoutedFlowFiles().put(REL_FAILURE, this.addErrorAttributesToFlowFilesInGroup(session, flowFilesToFailure, flowFileGroup.getFlowFiles(), exception));
                    break;
                }
                case Retry: {
                    List<FlowFile> flowFilesToRetry = this.getFlowFilesOnRelationship(result, REL_RETRY);
                    result.getRoutedFlowFiles().put(REL_RETRY, this.addErrorAttributesToFlowFilesInGroup(session, flowFilesToRetry, flowFileGroup.getFlowFiles(), exception));
                }
            }
        });
        return onGroupError;
    }

    private List<FlowFile> getFlowFilesOnRelationship(RoutingResult result, Relationship relationship) {
        return Optional.ofNullable((List)result.getRoutedFlowFiles().get(relationship)).orElse(Collections.emptyList());
    }

    private List<FlowFile> addErrorAttributesToFlowFilesInGroup(ProcessSession session, List<FlowFile> flowFilesOnRelationship, List<FlowFile> flowFilesInGroup, Exception exception) {
        return flowFilesOnRelationship.stream().map(ff -> flowFilesInGroup.contains(ff) ? this.addErrorAttributesToFlowFile(session, (FlowFile)ff, exception) : ff).collect(Collectors.toList());
    }

    private ExceptionHandler.OnError<FunctionContext, StatementFlowFileEnclosure> onBatchUpdateError(ProcessContext context, ProcessSession session, RoutingResult result) {
        return RollbackOnFailure.createOnError((c, enclosure, r, e) -> {
            if (e instanceof BatchUpdateException && !c.isRollbackOnFailure()) {
                int[] updateCounts = ((BatchUpdateException)e).getUpdateCounts();
                List<FlowFile> batchFlowFiles = enclosure.getFlowFiles();
                int failureCount = 0;
                int successCount = 0;
                int retryCount = 0;
                for (int i = 0; i < updateCounts.length; ++i) {
                    int updateCount = updateCounts[i];
                    FlowFile flowFile = batchFlowFiles.get(i);
                    if (updateCount == -3) {
                        result.routeTo(this.addErrorAttributesToFlowFile(session, flowFile, e), REL_FAILURE);
                        ++failureCount;
                        continue;
                    }
                    result.routeTo(flowFile, REL_SUCCESS);
                    ++successCount;
                }
                if (failureCount == 0) {
                    FlowFile failedFlowFile = batchFlowFiles.get(updateCounts.length);
                    result.routeTo(this.addErrorAttributesToFlowFile(session, failedFlowFile, e), REL_FAILURE);
                    ++failureCount;
                }
                if (updateCounts.length < batchFlowFiles.size()) {
                    List<FlowFile> unexecuted = batchFlowFiles.subList(updateCounts.length + 1, batchFlowFiles.size());
                    for (FlowFile flowFile : unexecuted) {
                        result.routeTo(flowFile, REL_RETRY);
                        ++retryCount;
                    }
                }
                this.getLogger().error("Failed to update database due to a failed batch update, {}. There were a total of {} FlowFiles that failed, {} that succeeded, and {} that were not execute and will be routed to retry; ", new Object[]{e, failureCount, successCount, retryCount, e});
                return;
            }
            ExceptionHandler.OnError onGroupError = this.onGroupError(context, session, result);
            onGroupError = onGroupError.andThen((cl, il, rl, el) -> {
                switch (r.destination()) {
                    case Failure: {
                        this.getLogger().error("Failed to update database for {} due to {}; routing to failure", new Object[]{il.getFlowFiles(), e, e});
                        break;
                    }
                    case Retry: {
                        this.getLogger().error("Failed to update database for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{il.getFlowFiles(), e, e});
                    }
                }
            });
            onGroupError.apply((Object)c, enclosure, r, e);
        });
    }

    @OnScheduled
    public void constructProcess() {
        this.process = new PutGroup();
        this.process.setLogger(this.getLogger());
        this.process.fetchFlowFiles(this.fetchFlowFiles);
        this.process.initConnection(this.initConnection);
        this.process.groupFetchedFlowFiles(this.groupFlowFiles);
        this.process.putFlowFiles(this.putFlowFiles);
        this.process.adjustRoute(RollbackOnFailure.createAdjustRoute((Relationship[])new Relationship[]{REL_FAILURE, REL_RETRY}));
        this.process.onCompleted((c, s, fc, conn) -> {
            try {
                if (!conn.getAutoCommit()) {
                    conn.commit();
                }
            }
            catch (SQLException e) {
                throw new ProcessException("Failed to commit database connection due to " + e, (Throwable)e);
            }
        });
        this.process.onFailed((c, s, fc, conn, e) -> {
            try {
                if (!conn.getAutoCommit()) {
                    conn.rollback();
                }
            }
            catch (SQLException re) {
                this.getLogger().warn("Failed to rollback database connection due to {}", new Object[]{re, re});
            }
        });
        this.process.cleanup((c, s, fc, conn) -> {
            boolean autocommit = c.getProperty(AUTO_COMMIT).asBoolean();
            if (fc.originalAutoCommit != autocommit) {
                try {
                    conn.setAutoCommit(fc.originalAutoCommit);
                }
                catch (SQLException se) {
                    this.getLogger().warn("Failed to reset autocommit due to {}", (Throwable)se);
                }
            }
        });
        this.process.adjustFailed((c, r) -> {
            if (c.getProperty(SUPPORT_TRANSACTIONS).asBoolean().booleanValue() && (r.contains(REL_RETRY) || r.contains(REL_FAILURE))) {
                List transferredFlowFiles = r.getRoutedFlowFiles().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
                Relationship rerouteShip = r.contains(REL_RETRY) ? REL_RETRY : REL_FAILURE;
                r.getRoutedFlowFiles().clear();
                r.routeTo(transferredFlowFiles, rerouteShip);
                return true;
            }
            return false;
        });
        this.exceptionHandler = new ExceptionHandler();
        this.exceptionHandler.mapException(e -> {
            if (e instanceof SQLNonTransientException) {
                return ErrorTypes.InvalidInput;
            }
            if (e instanceof SQLException) {
                return ErrorTypes.TemporalFailure;
            }
            return ErrorTypes.UnknownFailure;
        });
        this.adjustError = RollbackOnFailure.createAdjustError((ComponentLog)this.getLogger());
        this.exceptionHandler.adjustError(this.adjustError);
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean();
        FunctionContext functionContext = new FunctionContext(rollbackOnFailure);
        functionContext.obtainKeys = context.getProperty(OBTAIN_GENERATED_KEYS).asBoolean();
        RollbackOnFailure.onTrigger((ProcessContext)context, (ProcessSessionFactory)sessionFactory, (RollbackOnFailure)functionContext, (ComponentLog)this.getLogger(), session -> this.process.onTrigger(context, session, (Object)functionContext));
    }

    private FlowFilePoll pollFlowFiles(ProcessContext context, ProcessSession session, FunctionContext functionContext, RoutingResult result) {
        List flowFiles;
        boolean useTransactions = context.getProperty(SUPPORT_TRANSACTIONS).asBoolean();
        boolean fragmentedTransaction = false;
        int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        FlowFileFilter dbcpServiceFlowFileFilter = ((DBCPService)context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class)).getFlowFileFilter(batchSize);
        if (useTransactions) {
            TransactionalFlowFileFilter filter = new TransactionalFlowFileFilter(dbcpServiceFlowFileFilter);
            flowFiles = session.get((FlowFileFilter)filter);
            fragmentedTransaction = filter.isFragmentedTransaction();
        } else {
            flowFiles = dbcpServiceFlowFileFilter == null ? session.get(batchSize) : session.get(dbcpServiceFlowFileFilter);
        }
        if (flowFiles.isEmpty()) {
            return null;
        }
        if (fragmentedTransaction) {
            try {
                if (!this.isFragmentedTransactionReady(flowFiles, context.getProperty(TRANSACTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS))) {
                    flowFiles.forEach(f -> result.routeTo(session.penalize(f), Relationship.SELF));
                    return null;
                }
            }
            catch (IllegalArgumentException e) {
                ErrorTypes.Result adjustedRoute = this.adjustError.apply(functionContext, ErrorTypes.InvalidInput);
                this.onGroupError(context, session, result).apply((Object)functionContext, () -> flowFiles, adjustedRoute, (Exception)e);
                return null;
            }
            flowFiles.sort(Comparator.comparing(o -> Integer.parseInt(o.getAttribute(FRAGMENT_INDEX_ATTR))));
        }
        return new FlowFilePoll(flowFiles, fragmentedTransaction);
    }

    private String determineGeneratedKey(PreparedStatement stmt) {
        try {
            ResultSet generatedKeys = stmt.getGeneratedKeys();
            if (generatedKeys != null && generatedKeys.next()) {
                return generatedKeys.getString(1);
            }
        }
        catch (SQLException sQLException) {
            // empty catch block
        }
        return null;
    }

    private String getSQL(ProcessSession session, FlowFile flowFile) {
        byte[] buffer = new byte[(int)flowFile.getSize()];
        session.read(flowFile, in -> StreamUtils.fillBuffer((InputStream)in, (byte[])buffer));
        String sql = new String(buffer, StandardCharsets.UTF_8);
        return sql;
    }

    boolean isFragmentedTransactionReady(List<FlowFile> flowFiles, Long transactionTimeoutMillis) throws IllegalArgumentException {
        int selectedNumFragments = 0;
        BitSet bitSet = new BitSet();
        BiFunction<String, Object[], IllegalArgumentException> illegal = (s, objects) -> new IllegalArgumentException(String.format(s, objects));
        for (FlowFile flowFile : flowFiles) {
            int idx;
            int numFragments;
            String fragmentCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
            if (fragmentCount == null && flowFiles.size() == 1) {
                return true;
            }
            if (fragmentCount == null) {
                throw illegal.apply("Cannot process %s because there are %d FlowFiles with the same fragment.identifier attribute but not all FlowFiles have a fragment.count attribute", new Object[]{flowFile, flowFiles.size()});
            }
            try {
                numFragments = Integer.parseInt(fragmentCount);
            }
            catch (NumberFormatException nfe) {
                throw illegal.apply("Cannot process %s because the fragment.count attribute has a value of '%s', which is not an integer", new Object[]{flowFile, fragmentCount});
            }
            if (numFragments < 1) {
                throw illegal.apply("Cannot process %s because the fragment.count attribute has a value of '%s', which is not a positive integer", new Object[]{flowFile, fragmentCount});
            }
            if (selectedNumFragments == 0) {
                selectedNumFragments = numFragments;
            } else if (numFragments != selectedNumFragments) {
                throw illegal.apply("Cannot process %s because the fragment.count attribute has different values for different FlowFiles with the same fragment.identifier", new Object[]{flowFile});
            }
            String fragmentIndex = flowFile.getAttribute(FRAGMENT_INDEX_ATTR);
            if (fragmentIndex == null) {
                throw illegal.apply("Cannot process %s because the fragment.index attribute is missing", new Object[]{flowFile});
            }
            try {
                idx = Integer.parseInt(fragmentIndex);
            }
            catch (NumberFormatException nfe) {
                throw illegal.apply("Cannot process %s because the fragment.index attribute has a value of '%s', which is not an integer", new Object[]{flowFile, fragmentIndex});
            }
            if (idx < 0) {
                throw illegal.apply("Cannot process %s because the fragment.index attribute has a value of '%s', which is not a positive integer", new Object[]{flowFile, fragmentIndex});
            }
            if (bitSet.get(idx)) {
                throw illegal.apply("Cannot process %s because it has the same value for the fragment.index attribute as another FlowFile with the same fragment.identifier", new Object[]{flowFile});
            }
            bitSet.set(idx);
        }
        if (selectedNumFragments == flowFiles.size()) {
            return true;
        }
        long latestQueueTime = 0L;
        for (FlowFile flowFile : flowFiles) {
            if (flowFile.getLastQueueDate() == null || flowFile.getLastQueueDate() <= latestQueueTime) continue;
            latestQueueTime = flowFile.getLastQueueDate();
        }
        if (transactionTimeoutMillis != null && latestQueueTime > 0L && System.currentTimeMillis() - latestQueueTime > transactionTimeoutMillis) {
            throw illegal.apply("The transaction timeout has expired for the following FlowFiles; they will be routed to failure: %s", new Object[]{flowFiles});
        }
        this.getLogger().debug("Not enough FlowFiles for transaction. Returning all FlowFiles to queue");
        return false;
    }

    private FlowFile addErrorAttributesToFlowFile(ProcessSession session, FlowFile flowFile, Exception exception) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put(ERROR_MESSAGE_ATTR, exception.getMessage());
        if (exception instanceof SQLException) {
            int errorCode = ((SQLException)exception).getErrorCode();
            String sqlState = ((SQLException)exception).getSQLState();
            if (errorCode > 0) {
                attributes.put(ERROR_CODE_ATTR, String.valueOf(errorCode));
            }
            if (sqlState != null) {
                attributes.put(ERROR_SQL_STATE_ATTR, sqlState);
            }
        }
        return session.putAllAttributes(flowFile, attributes);
    }

    @FunctionalInterface
    private static interface GroupingFunction {
        public void apply(ProcessContext var1, ProcessSession var2, FunctionContext var3, Connection var4, List<FlowFile> var5, List<StatementFlowFileEnclosure> var6, RoutingResult var7);
    }

    private static class FunctionContext
    extends RollbackOnFailure {
        private boolean obtainKeys = false;
        private boolean fragmentedTransaction = false;
        private boolean originalAutoCommit = false;
        private final long startNanos = System.nanoTime();

        private FunctionContext(boolean rollbackOnFailure) {
            super(rollbackOnFailure, true);
        }

        private boolean isSupportBatching() {
            return !this.obtainKeys && !this.fragmentedTransaction;
        }
    }

    static class TransactionalFlowFileFilter
    implements FlowFileFilter {
        private final FlowFileFilter nonFragmentedTransactionFilter;
        private String selectedId = null;
        private int numSelected = 0;
        private boolean ignoreFragmentIdentifiers = false;

        public TransactionalFlowFileFilter(FlowFileFilter nonFragmentedTransactionFilter) {
            this.nonFragmentedTransactionFilter = nonFragmentedTransactionFilter;
        }

        public boolean isFragmentedTransaction() {
            return !this.ignoreFragmentIdentifiers;
        }

        private FlowFileFilter.FlowFileFilterResult filterNonFragmentedTransaction(FlowFile flowFile) {
            if (this.nonFragmentedTransactionFilter == null) {
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
            }
            return this.nonFragmentedTransactionFilter.filter(flowFile);
        }

        public FlowFileFilter.FlowFileFilterResult filter(FlowFile flowFile) {
            String fragmentId = flowFile.getAttribute(FRAGMENT_ID_ATTR);
            String fragCount = flowFile.getAttribute(FRAGMENT_COUNT_ATTR);
            if (this.ignoreFragmentIdentifiers) {
                if (fragmentId == null || "1".equals(fragCount)) {
                    return this.filterNonFragmentedTransaction(flowFile);
                }
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
            if (fragmentId == null || "1".equals(fragCount)) {
                if (this.selectedId == null) {
                    this.ignoreFragmentIdentifiers = true;
                    return this.filterNonFragmentedTransaction(flowFile);
                }
                return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
            }
            if (this.selectedId == null) {
                this.selectedId = fragmentId;
                ++this.numSelected;
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
            }
            if (this.selectedId.equals(fragmentId)) {
                int numFragments = fragCount != null && JdbcCommon.NUMBER_PATTERN.matcher(fragCount).matches() ? Integer.parseInt(fragCount) : Integer.MAX_VALUE;
                if (this.numSelected >= numFragments - 1) {
                    return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE;
                }
                ++this.numSelected;
                return FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE;
            }
            return FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
        }
    }

    private static class FlowFilePoll {
        private final List<FlowFile> flowFiles;
        private final boolean fragmentedTransaction;

        public FlowFilePoll(List<FlowFile> flowFiles, boolean fragmentedTransaction) {
            this.flowFiles = flowFiles;
            this.fragmentedTransaction = fragmentedTransaction;
        }

        public List<FlowFile> getFlowFiles() {
            return this.flowFiles;
        }

        public boolean isFragmentedTransaction() {
            return this.fragmentedTransaction;
        }
    }

    private static class StatementFlowFileEnclosure
    implements PartialFunctions.FlowFileGroup {
        private final String sql;
        private PreparedStatement statement;
        private final List<FlowFile> flowFiles = new ArrayList<FlowFile>();

        public StatementFlowFileEnclosure(String sql) {
            this.sql = sql;
        }

        public String getSql() {
            return this.sql;
        }

        public PreparedStatement getNewStatement(Connection conn, boolean obtainKeys) throws SQLException {
            if (obtainKeys) {
                PreparedStatement stmt = conn.prepareStatement(this.sql, 1);
                if (stmt == null) {
                    stmt = conn.prepareStatement(this.sql);
                }
                return stmt;
            }
            return conn.prepareStatement(this.sql);
        }

        public PreparedStatement getCachedStatement(Connection conn) throws SQLException {
            if (this.statement != null) {
                return this.statement;
            }
            this.statement = conn.prepareStatement(this.sql);
            return this.statement;
        }

        public List<FlowFile> getFlowFiles() {
            return this.flowFiles;
        }

        public void addFlowFile(FlowFile flowFile) {
            this.flowFiles.add(flowFile);
        }

        public int hashCode() {
            return this.sql.hashCode();
        }

        public boolean equals(Object obj) {
            if (obj == null) {
                return false;
            }
            if (obj == this) {
                return false;
            }
            if (!(obj instanceof StatementFlowFileEnclosure)) {
                return false;
            }
            StatementFlowFileEnclosure other = (StatementFlowFileEnclosure)obj;
            return this.sql.equals(other.sql);
        }
    }

    private static class FragmentedEnclosure
    extends StatementFlowFileEnclosure {
        private final Map<FlowFile, StatementFlowFileEnclosure> flowFileToEnclosure = new HashMap<FlowFile, StatementFlowFileEnclosure>();

        public FragmentedEnclosure() {
            super(null);
        }

        public void addFlowFile(FlowFile flowFile, StatementFlowFileEnclosure enclosure) {
            this.addFlowFile(flowFile);
            this.flowFileToEnclosure.put(flowFile, enclosure);
        }

        public StatementFlowFileEnclosure getTargetEnclosure(FlowFile flowFile) {
            return this.flowFileToEnclosure.get(flowFile);
        }
    }
}

