/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import jakarta.xml.bind.DatatypeConverter;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import org.apache.commons.lang3.RandomUtils;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processors.standard.PutSQL;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestPutSQL {
    private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)";
    private static final String createPersonsAutoId = "CREATE TABLE PERSONS_AI (id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1), name VARCHAR(100), code INTEGER check(code <= 100))";
    private static final String DERBY_LOG_PROPERTY = "derby.stream.error.file";
    protected static DBCPService service;

    @BeforeAll
    public static void setupDerbyLog() throws ProcessException, SQLException {
        System.setProperty(DERBY_LOG_PROPERTY, "target/derby.log");
        File dbDir = new File(TestPutSQL.getEmptyDirectory(), "db");
        dbDir.deleteOnExit();
        service = new MockDBCPService(dbDir.getAbsolutePath());
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate(createPersons);
            stmt.executeUpdate(createPersonsAutoId);
        }
    }

    @AfterAll
    public static void cleanupDerbyLog() {
        System.clearProperty(DERBY_LOG_PROPERTY);
    }

    @Test
    public void testDirectStatements() throws InitializationException, ProcessException, SQLException {
        ResultSet rs;
        Statement stmt;
        TestRunner runner = this.initTestRunner();
        this.recreateTable("PERSONS", createPersons);
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();){
            stmt = conn.createStatement();
            try {
                rs = stmt.executeQuery("SELECT * FROM PERSONS");
                Assertions.assertTrue((boolean)rs.next());
                Assertions.assertEquals((int)1, (int)rs.getInt(1));
                Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
                Assertions.assertEquals((int)84, (int)rs.getInt(3));
                Assertions.assertFalse((boolean)rs.next());
            }
            finally {
                if (stmt != null) {
                    stmt.close();
                }
            }
        }
        runner.enqueue("UPDATE PERSONS SET NAME='George' WHERE ID=1".getBytes());
        runner.run();
        conn = service.getConnection();
        try {
            stmt = conn.createStatement();
            try {
                rs = stmt.executeQuery("SELECT * FROM PERSONS");
                Assertions.assertTrue((boolean)rs.next());
                Assertions.assertEquals((int)1, (int)rs.getInt(1));
                Assertions.assertEquals((Object)"George", (Object)rs.getString(2));
                Assertions.assertEquals((int)84, (int)rs.getInt(3));
                Assertions.assertFalse((boolean)rs.next());
            }
            finally {
                if (stmt != null) {
                    stmt.close();
                }
            }
        }
        finally {
            if (conn != null) {
                conn.close();
            }
        }
    }

    @Test
    public void testCommitOnCleanup() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.AUTO_COMMIT, "false");
        this.recreateTable("PERSONS", createPersons);
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)".getBytes());
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
            Assertions.assertEquals((int)84, (int)rs.getInt(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testInsertWithGeneratedKeys() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
        this.recreateTable("PERSONS_AI", createPersonsAutoId);
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        MockFlowFile mff = (MockFlowFile)runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS).get(0);
        mff.assertAttributeEquals("sql.generated.key", "1");
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
            Assertions.assertEquals((int)84, (int)rs.getInt(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testProvenanceEventsWithBatchMode() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        this.testProvenanceEvents(runner);
    }

    @Test
    public void testProvenanceEventsWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        this.testProvenanceEvents(runner);
    }

    @Test
    public void testProvenanceEventsWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
        this.testProvenanceEvents(runner);
    }

    private void testProvenanceEvents(TestRunner runner) throws ProcessException, SQLException {
        this.recreateTable("PERSONS", createPersons);
        runner.enqueue("DELETE FROM PERSONS WHERE ID = 1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (1, 'Mark', 84)");
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 2);
        List provenanceEvents = runner.getProvenanceEvents();
        Assertions.assertEquals((int)2, (int)provenanceEvents.size());
        for (ProvenanceEventRecord event : provenanceEvents) {
            Assertions.assertEquals((Object)ProvenanceEventType.SEND, (Object)event.getEventType());
        }
    }

    @Test
    public void testKeepFlowFileOrderingWithBatchMode() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        this.testKeepFlowFileOrdering(runner);
    }

    @Test
    public void testKeepFlowFileOrderingWithFragmentedTransaction() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "true");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        this.testKeepFlowFileOrdering(runner);
    }

    @Test
    public void testKeepFlowFileOrderingWithObtainGeneratedKeys() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "10");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "true");
        this.testKeepFlowFileOrdering(runner);
    }

    private void testKeepFlowFileOrdering(TestRunner runner) throws ProcessException, SQLException {
        this.recreateTable("PERSONS", createPersons);
        String delete = "DELETE FROM PERSONS WHERE ID = ?";
        String insert = "INSERT INTO PERSONS (ID) VALUES (?)";
        String[] statements = new String[]{"DELETE FROM PERSONS WHERE ID = ?", "INSERT INTO PERSONS (ID) VALUES (?)", "INSERT INTO PERSONS (ID) VALUES (?)", "DELETE FROM PERSONS WHERE ID = ?", "DELETE FROM PERSONS WHERE ID = ?", "INSERT INTO PERSONS (ID) VALUES (?)"};
        Function<Integer, Map> createSqlAttributes = id -> {
            HashMap<String, String> attributes = new HashMap<String, String>();
            attributes.put("sql.args.1.type", String.valueOf(4));
            attributes.put("sql.args.1.value", String.valueOf(id));
            return attributes;
        };
        int flowFileCount = statements.length;
        for (int i = 0; i < flowFileCount; ++i) {
            runner.enqueue(statements[i], createSqlAttributes.apply(i));
        }
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, flowFileCount);
        List flowFiles = runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS);
        for (int i = 0; i < flowFileCount; ++i) {
            MockFlowFile flowFile = (MockFlowFile)flowFiles.get(i);
            Assertions.assertEquals((Object)statements[i], (Object)flowFile.getContent());
            Assertions.assertEquals((Object)String.valueOf(i), (Object)flowFile.getAttribute("sql.args.1.value"));
        }
        List provenanceEvents = runner.getProvenanceEvents();
        Assertions.assertEquals((int)flowFileCount, (int)provenanceEvents.size());
        for (int i = 0; i < flowFileCount; ++i) {
            ProvenanceEventRecord event = (ProvenanceEventRecord)provenanceEvents.get(i);
            Assertions.assertEquals((Object)String.valueOf(i), (Object)event.getAttribute("sql.args.1.value"));
        }
    }

    @Test
    public void testFailInMiddleWithBadStatementAndSupportTransaction() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        this.testFailInMiddleWithBadStatement(runner);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        TestPutSQL.assertErrorAttributesInTransaction(runner, PutSQL.REL_FAILURE);
    }

    @Test
    public void testFailInMiddleWithBadStatementAndNotSupportTransaction() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        this.testFailInMiddleWithBadStatement(runner);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
        TestPutSQL.assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE);
    }

    @Test
    public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
    }

    @Test
    public void testFailInMiddleWithBadParameterTypeAndNotSupportTransaction() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        this.testFailInMiddleWithBadParameterType(runner);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
        TestPutSQL.assertErrorAttributesNotSet(runner, PutSQL.REL_SUCCESS);
        TestPutSQL.assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE);
    }

    @Test
    public void testFailInMiddleWithBadParameterTypeAndSupportTransaction() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        this.testFailInMiddleWithBadParameterType(runner);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_FAILURE, 4);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        TestPutSQL.assertErrorAttributesInTransaction(runner, PutSQL.REL_FAILURE);
    }

    @Test
    public void testFailInMiddleWithBadParameterTypeRollbackOnFailure() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        HashMap<String, String> goodAttributes = new HashMap<String, String>();
        goodAttributes.put("sql.args.1.type", String.valueOf(4));
        goodAttributes.put("sql.args.1.value", "84");
        HashMap<String, String> badAttributes = new HashMap<String, String>();
        badAttributes.put("sql.args.1.type", String.valueOf(12));
        badAttributes.put("sql.args.1.value", "hello");
        byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
    }

    @Test
    public void testFailInMiddleWithNumberFormatException() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "false");
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        HashMap<String, String> goodAttributes = new HashMap<String, String>();
        goodAttributes.put("sql.args.1.type", String.valueOf(4));
        goodAttributes.put("sql.args.1.value", "84");
        HashMap<String, String> badAttributes = new HashMap<String, String>();
        badAttributes.put("sql.args.1.type", String.valueOf(4));
        badAttributes.put("sql.args.1.value", "hello");
        byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 3);
        TestPutSQL.assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE);
    }

    @Test
    public void testFailInMiddleWithBadParameterValueAndSupportTransaction() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        this.testFailInMiddleWithBadParameterValue(runner);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
        runner.assertTransferCount(PutSQL.REL_RETRY, 4);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
            Assertions.assertFalse((boolean)rs.next());
        }
        TestPutSQL.assertErrorAttributesInTransaction(runner, PutSQL.REL_RETRY);
        TestPutSQL.assertOriginalAttributesAreKept(runner);
    }

    @Test
    public void testFailInMiddleWithBadParameterValueAndNotSupportTransaction() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.SUPPORT_TRANSACTIONS, "false");
        this.testFailInMiddleWithBadParameterValue(runner);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 1);
        runner.assertTransferCount(PutSQL.REL_FAILURE, 1);
        runner.assertTransferCount(PutSQL.REL_RETRY, 2);
        TestPutSQL.assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE);
        TestPutSQL.assertOriginalAttributesAreKept(runner);
        TestPutSQL.assertErrorAttributesNotSet(runner, PutSQL.REL_SUCCESS);
        TestPutSQL.assertErrorAttributesNotSet(runner, PutSQL.REL_RETRY);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
            Assertions.assertEquals((int)84, (int)rs.getInt(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testFailInMiddleWithBadParameterValueRollbackOnFailure() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        this.recreateTable("PERSONS_AI", createPersonsAutoId);
        HashMap<String, String> goodAttributes = new HashMap<String, String>();
        goodAttributes.put("sql.args.1.type", String.valueOf(4));
        goodAttributes.put("sql.args.1.value", "84");
        HashMap<String, String> badAttributes = new HashMap<String, String>();
        badAttributes.put("sql.args.1.type", String.valueOf(4));
        badAttributes.put("sql.args.1.value", "9999");
        byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 0);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS_AI");
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testUsingSqlDataTypesWithNegativeValues() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE PERSONS2 (id integer primary key, name varchar(100), code bigint)");
        }
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", "-5");
        attributes.put("sql.args.1.value", "84");
        runner.enqueue("INSERT INTO PERSONS2 (ID, NAME, CODE) VALUES (1, 'Mark', ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS2");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
            Assertions.assertEquals((int)84, (int)rs.getInt(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testUsingTimestampValuesEpochAndString() throws InitializationException, ProcessException, SQLException, ParseException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST1 (id integer primary key, ts1 timestamp, ts2 timestamp)");
        }
        String arg2TS = "2001-01-01 00:01:01.001";
        String art3TS = "2002-02-02 12:02:02.002";
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        Date parsedDate = dateFormat.parse("2001-01-01 00:01:01.001");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(93));
        attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(93));
        attributes.put("sql.args.2.value", "2002-02-02 12:02:02.002");
        runner.enqueue("INSERT INTO TIMESTAMPTEST1 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST1");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"2001-01-01 00:01:01.001", (Object)rs.getString(2));
            Assertions.assertEquals((Object)"2002-02-02 12:02:02.002", (Object)rs.getString(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testUsingTimestampValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST2 (id integer primary key, ts1 timestamp, ts2 timestamp)");
        }
        String dateStr1 = "2002-02-02T12:02:02";
        String dateStrTimestamp1 = "2002-02-02 12:02:02";
        long dateInt1 = Timestamp.valueOf("2002-02-02 12:02:02").getTime();
        String dateStr2 = "2002-02-02T12:02:02.123456789";
        String dateStrTimestamp2 = "2002-02-02 12:02:02.123456789";
        long dateInt2 = Timestamp.valueOf("2002-02-02 12:02:02.123456789").getTime();
        long nanoInt2 = 123456789L;
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(93));
        attributes.put("sql.args.1.value", "2002-02-02T12:02:02");
        attributes.put("sql.args.1.format", "ISO_LOCAL_DATE_TIME");
        attributes.put("sql.args.2.type", String.valueOf(93));
        attributes.put("sql.args.2.value", "2002-02-02T12:02:02.123456789");
        attributes.put("sql.args.2.format", "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS");
        runner.enqueue("INSERT INTO TIMESTAMPTEST2 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST2");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((long)dateInt1, (long)rs.getTimestamp(2).getTime());
            Assertions.assertEquals((long)dateInt2, (long)rs.getTimestamp(3).getTime());
            Assertions.assertEquals((long)123456789L, (long)rs.getTimestamp(3).getNanos());
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testUsingDateTimeValuesWithFormatAttribute() throws InitializationException, ProcessException, SQLException, ParseException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE TIMESTAMPTEST3 (id integer primary key, ts1 TIME, ts2 DATE)");
        }
        String dateStr = "2002-03-04";
        String timeStr = "02:03:04";
        String timeFormatString = "HH:mm:ss";
        String dateFormatString = "yyyy-MM-dd";
        DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_LOCAL_TIME;
        LocalTime parsedTime = LocalTime.parse("02:03:04", timeFormatter);
        Time expectedTime = Time.valueOf(parsedTime);
        DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_LOCAL_DATE;
        LocalDate parsedDate = LocalDate.parse("2002-03-04", dateFormatter);
        Date expectedDate = new Date(Date.from(parsedDate.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()).getTime());
        long expectedTimeInLong = expectedTime.getTime();
        long expectedDateInLong = expectedDate.getTime();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(92));
        attributes.put("sql.args.1.value", "02:03:04");
        attributes.put("sql.args.1.format", "ISO_LOCAL_TIME");
        attributes.put("sql.args.2.type", String.valueOf(91));
        attributes.put("sql.args.2.value", "2002-03-04");
        attributes.put("sql.args.2.format", "ISO_LOCAL_DATE");
        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");
        Date parsedLocalTime = timeFormat.parse("02:03:04");
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
        Date parsedLocalDate = dateFormat.parse("2002-03-04");
        attributes = new HashMap();
        attributes.put("sql.args.1.type", String.valueOf(92));
        attributes.put("sql.args.1.value", Long.toString(parsedLocalTime.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(91));
        attributes.put("sql.args.2.value", Long.toString(parsedLocalDate.getTime()));
        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (2, ?, ?)".getBytes(), attributes);
        attributes = new HashMap();
        attributes.put("sql.args.1.type", String.valueOf(92));
        attributes.put("sql.args.1.value", "020304000");
        attributes.put("sql.args.1.format", "HHmmssSSS");
        attributes.put("sql.args.2.type", String.valueOf(91));
        attributes.put("sql.args.2.value", "20020304");
        attributes.put("sql.args.2.format", "yyyyMMdd");
        runner.enqueue("INSERT INTO TIMESTAMPTEST3 (ID, ts1, ts2) VALUES (3, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 3);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM TIMESTAMPTEST3 ORDER BY ID");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((long)expectedTimeInLong, (long)rs.getTime(2).getTime());
            Assertions.assertEquals((long)expectedDateInLong, (long)rs.getDate(3).getTime());
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)2, (int)rs.getInt(1));
            Assertions.assertEquals((long)parsedLocalTime.getTime(), (long)rs.getTime(2).getTime());
            Assertions.assertEquals((long)parsedLocalDate.getTime(), (long)rs.getDate(3).getTime());
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)3, (int)rs.getInt(1));
            Assertions.assertEquals((long)expectedTimeInLong, (long)rs.getTime(2).getTime());
            Assertions.assertEquals((long)expectedDateInLong, (long)rs.getDate(3).getTime());
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testBitType() throws SQLException, InitializationException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE BITTESTS (id integer primary key, bt1 BOOLEAN)");
        }
        byte[] insertStatement = "INSERT INTO BITTESTS (ID, bt1) VALUES (?, ?)".getBytes();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "1");
        runner.enqueue(insertStatement, attributes);
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "2");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "0");
        runner.enqueue(insertStatement, attributes);
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "3");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "-5");
        runner.enqueue(insertStatement, attributes);
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "4");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "t");
        runner.enqueue(insertStatement, attributes);
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "5");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "f");
        runner.enqueue(insertStatement, attributes);
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "6");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "T");
        runner.enqueue(insertStatement, attributes);
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "7");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "true");
        runner.enqueue(insertStatement, attributes);
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "8");
        attributes.put("sql.args.2.type", String.valueOf(-7));
        attributes.put("sql.args.2.value", "false");
        runner.enqueue(insertStatement, attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 8);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM BITTESTS");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertTrue((boolean)rs.getBoolean(2));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)2, (int)rs.getInt(1));
            Assertions.assertFalse((boolean)rs.getBoolean(2));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)3, (int)rs.getInt(1));
            Assertions.assertFalse((boolean)rs.getBoolean(2));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)4, (int)rs.getInt(1));
            Assertions.assertTrue((boolean)rs.getBoolean(2));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)5, (int)rs.getInt(1));
            Assertions.assertFalse((boolean)rs.getBoolean(2));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)6, (int)rs.getInt(1));
            Assertions.assertTrue((boolean)rs.getBoolean(2));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)7, (int)rs.getInt(1));
            Assertions.assertTrue((boolean)rs.getBoolean(2));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)8, (int)rs.getInt(1));
            Assertions.assertFalse((boolean)rs.getBoolean(2));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testUsingTimeValuesEpochAndString() throws InitializationException, ProcessException, SQLException, ParseException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE TIMETESTS (id integer primary key, ts1 time, ts2 time)");
        }
        String arg2TS = "00:01:02";
        String art3TS = "02:03:04";
        String timeFormatString = "HH:mm:ss";
        SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
        Date parsedDate = dateFormat.parse("00:01:02");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(92));
        attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(92));
        attributes.put("sql.args.2.value", "02:03:04");
        attributes.put("sql.args.2.format", "HH:mm:ss");
        runner.enqueue("INSERT INTO TIMETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM TIMETESTS");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"00:01:02", (Object)dateFormat.format(rs.getTime(2)));
            Assertions.assertEquals((Object)"02:03:04", (Object)rs.getString(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testUsingDateValuesEpochAndString() throws InitializationException, ProcessException, SQLException, ParseException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE DATETESTS (id integer primary key, ts1 date, ts2 date)");
        }
        String arg2TS = "2001-01-01";
        String art3TS = "2002-02-02";
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
        Date parsedDate = dateFormat.parse("2001-01-01");
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(91));
        attributes.put("sql.args.1.value", Long.toString(parsedDate.getTime()));
        attributes.put("sql.args.2.type", String.valueOf(91));
        attributes.put("sql.args.2.value", "2002-02-02");
        runner.enqueue("INSERT INTO DATETESTS (ID, ts1, ts2) VALUES (1, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM DATETESTS");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"2001-01-01", (Object)rs.getString(2));
            Assertions.assertEquals((Object)"2002-02-02", (Object)rs.getString(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testBinaryColumnTypes() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("CREATE TABLE BINARYTESTS (id integer primary key, bn1 CHAR(8) FOR BIT DATA, bn2 VARCHAR(100) FOR BIT DATA, bn3 LONG VARCHAR FOR BIT DATA)");
        }
        byte[] insertStatement = "INSERT INTO BINARYTESTS (ID, bn1, bn2, bn3) VALUES (?, ?, ?, ?)".getBytes();
        String arg2BIN = this.fixedSizeByteArrayAsASCIIString(8);
        String art3VARBIN = this.fixedSizeByteArrayAsASCIIString(50);
        String art4LongBin = this.fixedSizeByteArrayAsASCIIString(32700);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(-2));
        attributes.put("sql.args.2.value", arg2BIN);
        attributes.put("sql.args.3.type", String.valueOf(-3));
        attributes.put("sql.args.3.value", art3VARBIN);
        attributes.put("sql.args.4.type", String.valueOf(-4));
        attributes.put("sql.args.4.value", art4LongBin);
        runner.enqueue(insertStatement, attributes);
        attributes = new HashMap();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "2");
        attributes.put("sql.args.2.type", String.valueOf(-2));
        attributes.put("sql.args.2.value", arg2BIN);
        attributes.put("sql.args.2.format", "ascii");
        attributes.put("sql.args.3.type", String.valueOf(-3));
        attributes.put("sql.args.3.value", art3VARBIN);
        attributes.put("sql.args.3.format", "ascii");
        attributes.put("sql.args.4.type", String.valueOf(-4));
        attributes.put("sql.args.4.value", art4LongBin);
        attributes.put("sql.args.4.format", "ascii");
        runner.enqueue(insertStatement, attributes);
        String arg2HexBIN = this.fixedSizeByteArrayAsHexString(8);
        String art3HexVARBIN = this.fixedSizeByteArrayAsHexString(50);
        String art4HexLongBin = this.fixedSizeByteArrayAsHexString(32700);
        attributes = new HashMap();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "3");
        attributes.put("sql.args.2.type", String.valueOf(-2));
        attributes.put("sql.args.2.value", arg2HexBIN);
        attributes.put("sql.args.2.format", "hex");
        attributes.put("sql.args.3.type", String.valueOf(-3));
        attributes.put("sql.args.3.value", art3HexVARBIN);
        attributes.put("sql.args.3.format", "hex");
        attributes.put("sql.args.4.type", String.valueOf(-4));
        attributes.put("sql.args.4.value", art4HexLongBin);
        attributes.put("sql.args.4.format", "hex");
        runner.enqueue(insertStatement, attributes);
        String arg2Base64BIN = this.fixedSizeByteArrayAsBase64String(8);
        String art3Base64VARBIN = this.fixedSizeByteArrayAsBase64String(50);
        String art4Base64LongBin = this.fixedSizeByteArrayAsBase64String(32700);
        attributes = new HashMap();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "4");
        attributes.put("sql.args.2.type", String.valueOf(-2));
        attributes.put("sql.args.2.value", arg2Base64BIN);
        attributes.put("sql.args.2.format", "base64");
        attributes.put("sql.args.3.type", String.valueOf(-3));
        attributes.put("sql.args.3.value", art3Base64VARBIN);
        attributes.put("sql.args.3.format", "base64");
        attributes.put("sql.args.4.type", String.valueOf(-4));
        attributes.put("sql.args.4.value", art4Base64LongBin);
        attributes.put("sql.args.4.format", "base64");
        runner.enqueue(insertStatement, attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 4);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM BINARYTESTS");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertArrayEquals((byte[])arg2BIN.getBytes(StandardCharsets.US_ASCII), (byte[])rs.getBytes(2));
            Assertions.assertArrayEquals((byte[])art3VARBIN.getBytes(StandardCharsets.US_ASCII), (byte[])rs.getBytes(3));
            Assertions.assertArrayEquals((byte[])art4LongBin.getBytes(StandardCharsets.US_ASCII), (byte[])rs.getBytes(4));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)2, (int)rs.getInt(1));
            Assertions.assertArrayEquals((byte[])arg2BIN.getBytes(StandardCharsets.US_ASCII), (byte[])rs.getBytes(2));
            Assertions.assertArrayEquals((byte[])art3VARBIN.getBytes(StandardCharsets.US_ASCII), (byte[])rs.getBytes(3));
            Assertions.assertArrayEquals((byte[])art4LongBin.getBytes(StandardCharsets.US_ASCII), (byte[])rs.getBytes(4));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)3, (int)rs.getInt(1));
            Assertions.assertArrayEquals((byte[])DatatypeConverter.parseHexBinary((String)arg2HexBIN), (byte[])rs.getBytes(2));
            Assertions.assertArrayEquals((byte[])DatatypeConverter.parseHexBinary((String)art3HexVARBIN), (byte[])rs.getBytes(3));
            Assertions.assertArrayEquals((byte[])DatatypeConverter.parseHexBinary((String)art4HexLongBin), (byte[])rs.getBytes(4));
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)4, (int)rs.getInt(1));
            Assertions.assertArrayEquals((byte[])DatatypeConverter.parseBase64Binary((String)arg2Base64BIN), (byte[])rs.getBytes(2));
            Assertions.assertArrayEquals((byte[])DatatypeConverter.parseBase64Binary((String)art3Base64VARBIN), (byte[])rs.getBytes(3));
            Assertions.assertArrayEquals((byte[])DatatypeConverter.parseBase64Binary((String)art4Base64LongBin), (byte[])rs.getBytes(4));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testStatementsWithPreparedParameters() throws InitializationException, ProcessException, SQLException {
        ResultSet rs;
        Statement stmt;
        TestRunner runner = this.initTestRunner();
        this.recreateTable("PERSONS", createPersons);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();){
            stmt = conn.createStatement();
            try {
                rs = stmt.executeQuery("SELECT * FROM PERSONS");
                Assertions.assertTrue((boolean)rs.next());
                Assertions.assertEquals((int)1, (int)rs.getInt(1));
                Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
                Assertions.assertEquals((int)84, (int)rs.getInt(3));
                Assertions.assertFalse((boolean)rs.next());
            }
            finally {
                if (stmt != null) {
                    stmt.close();
                }
            }
        }
        runner.clearTransferState();
        attributes.clear();
        attributes.put("sql.args.1.type", String.valueOf(12));
        attributes.put("sql.args.1.value", "George");
        attributes.put("sql.args.2.type", String.valueOf(4));
        attributes.put("sql.args.2.value", "1");
        runner.enqueue("UPDATE PERSONS SET NAME=? WHERE ID=?".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        conn = service.getConnection();
        try {
            stmt = conn.createStatement();
            try {
                rs = stmt.executeQuery("SELECT * FROM PERSONS");
                Assertions.assertTrue((boolean)rs.next());
                Assertions.assertEquals((int)1, (int)rs.getInt(1));
                Assertions.assertEquals((Object)"George", (Object)rs.getString(2));
                Assertions.assertEquals((int)84, (int)rs.getInt(3));
                Assertions.assertFalse((boolean)rs.next());
            }
            finally {
                if (stmt != null) {
                    stmt.close();
                }
            }
        }
        finally {
            if (conn != null) {
                conn.close();
            }
        }
    }

    @Test
    public void testMultipleStatementsWithinFlowFile() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        this.recreateTable("PERSONS", createPersons);
        String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("sql.args.4.type", String.valueOf(4));
        attributes.put("sql.args.4.value", "1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
        TestPutSQL.assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testMultipleStatementsWithinFlowFileRollbackOnFailure() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        this.recreateTable("PERSONS", createPersons);
        String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("sql.args.4.type", String.valueOf(4));
        attributes.put("sql.args.4.value", "1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ".getBytes(), attributes);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testWithNullParameter() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
            Assertions.assertEquals((int)0, (int)rs.getInt(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testInvalidStatement() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        this.recreateTable("PERSONS", createPersons);
        String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("sql.args.4.type", String.valueOf(4));
        attributes.put("sql.args.4.value", "1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
        TestPutSQL.assertSQLExceptionRelatedAttributes(runner, PutSQL.REL_FAILURE);
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testInvalidStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        this.recreateTable("PERSONS", createPersons);
        String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ";
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("sql.args.4.type", String.valueOf(4));
        attributes.put("sql.args.4.value", "1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE SOME_RANDOM_TABLE NAME='George' WHERE ID=?; ".getBytes(), attributes);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testRetryableFailure() throws InitializationException, ProcessException {
        TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
        SQLExceptionService service = new SQLExceptionService(null);
        runner.addControllerService("dbcp", (ControllerService)service);
        runner.enableControllerService((ControllerService)service);
        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
        String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("sql.args.4.type", String.valueOf(4));
        attributes.put("sql.args.4.value", "1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 1);
        TestPutSQL.assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_RETRY);
    }

    @Test
    public void testRetryableFailureRollbackOnFailure() throws InitializationException, ProcessException {
        TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
        SQLExceptionService service = new SQLExceptionService(null);
        runner.addControllerService("dbcp", (ControllerService)service);
        runner.enableControllerService((ControllerService)service);
        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ";
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("sql.args.4.type", String.valueOf(4));
        attributes.put("sql.args.4.value", "1");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); UPDATE PERSONS SET NAME='George' WHERE ID=?; ".getBytes(), attributes);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        runner.assertAllFlowFilesTransferred(PutSQL.REL_RETRY, 0);
    }

    @Test
    public void testMultipleFlowFilesSuccessfulInTransaction() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "1");
        this.recreateTable("PERSONS", createPersons);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
        attributes.clear();
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "1");
        runner.clearTransferState();
        runner.enqueue("UPDATE PERSONS SET NAME='Leonard' WHERE ID=1".getBytes(), attributes);
        runner.run();
        runner.assertTransferCount(PutSQL.REL_SUCCESS, 2);
        runner.assertTransferCount(PutSQL.REL_FAILURE, 0);
        runner.assertTransferCount(PutSQL.REL_RETRY, 0);
        for (MockFlowFile mff : runner.getFlowFilesForRelationship(PutSQL.REL_SUCCESS)) {
            mff.assertAttributeEquals("fragment.identifier", "1");
        }
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
            Assertions.assertTrue((boolean)rs.next());
            Assertions.assertEquals((int)1, (int)rs.getInt(1));
            Assertions.assertEquals((Object)"Leonard", (Object)rs.getString(2));
            Assertions.assertEquals((int)84, (int)rs.getInt(3));
            Assertions.assertFalse((boolean)rs.next());
        }
    }

    @Test
    public void testMultipleFlowFilesSuccessfulInTransactionRollBackOnFailure() throws InitializationException, ProcessException, SQLException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.BATCH_SIZE, "1");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        this.recreateTable("PERSONS", createPersons);
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("sql.args.1.type", String.valueOf(4));
        attributes.put("sql.args.1.value", "1");
        attributes.put("sql.args.2.type", String.valueOf(12));
        attributes.put("sql.args.2.value", "Mark");
        attributes.put("sql.args.3.type", String.valueOf(4));
        attributes.put("sql.args.3.value", "84");
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");
        runner.enqueue("INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?)".getBytes(), attributes);
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 0);
    }

    @Test
    public void testTransactionTimeout() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
        final HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");
        MockFlowFile mff = new MockFlowFile(0L){

            public Long getLastQueueDate() {
                return System.currentTimeMillis() - 10000L;
            }

            public Map<String, String> getAttributes() {
                return attributes;
            }

            public String getAttribute(String attrName) {
                return (String)attributes.get(attrName);
            }
        };
        runner.enqueue(new FlowFile[]{mff});
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 1);
        TestPutSQL.assertNonSQLErrorRelatedAttributes(runner, PutSQL.REL_FAILURE);
    }

    @Test
    public void testTransactionTimeoutRollbackOnFailure() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        final HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("fragment.identifier", "1");
        attributes.put("fragment.count", "2");
        attributes.put("fragment.index", "0");
        MockFlowFile mff = new MockFlowFile(0L){

            public Long getLastQueueDate() {
                return System.currentTimeMillis() - 10000L;
            }

            public Map<String, String> getAttributes() {
                return attributes;
            }

            public String getAttribute(String attrName) {
                return (String)attributes.get(attrName);
            }
        };
        runner.enqueue(new FlowFile[]{mff});
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
    }

    @Test
    public void testNullFragmentCountRollbackOnFailure() throws InitializationException, ProcessException {
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.TRANSACTION_TIMEOUT, "5 secs");
        runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true");
        HashMap<String, String> attribute1 = new HashMap<String, String>();
        attribute1.put("fragment.identifier", "1");
        attribute1.put("fragment.count", "2");
        attribute1.put("fragment.index", "0");
        HashMap<String, String> attribute2 = new HashMap<String, String>();
        attribute2.put("fragment.identifier", "1");
        attribute2.put("fragment.index", "1");
        runner.enqueue(new byte[0], attribute1);
        runner.enqueue(new byte[0], attribute2);
        AssertionError e = (AssertionError)((Object)Assertions.assertThrows(AssertionError.class, () -> ((TestRunner)runner).run()));
        Assertions.assertInstanceOf(ProcessException.class, (Object)((Throwable)((Object)e)).getCause());
        runner.assertAllFlowFilesTransferred(PutSQL.REL_FAILURE, 0);
    }

    @Test
    public void testStatementsFromProperty() throws InitializationException, ProcessException, SQLException {
        ResultSet rs;
        Statement stmt;
        TestRunner runner = this.initTestRunner();
        runner.setProperty(PutSQL.SQL_STATEMENT, "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (${row.id}, 'Mark', 84)");
        this.recreateTable("PERSONS", createPersons);
        runner.enqueue("This statement should be ignored".getBytes(), Collections.singletonMap("row.id", "1"));
        runner.run();
        runner.assertAllFlowFilesTransferred(PutSQL.REL_SUCCESS, 1);
        try (Connection conn = service.getConnection();){
            stmt = conn.createStatement();
            try {
                rs = stmt.executeQuery("SELECT * FROM PERSONS");
                Assertions.assertTrue((boolean)rs.next());
                Assertions.assertEquals((int)1, (int)rs.getInt(1));
                Assertions.assertEquals((Object)"Mark", (Object)rs.getString(2));
                Assertions.assertEquals((int)84, (int)rs.getInt(3));
                Assertions.assertFalse((boolean)rs.next());
            }
            finally {
                if (stmt != null) {
                    stmt.close();
                }
            }
        }
        runner.setProperty(PutSQL.SQL_STATEMENT, "UPDATE PERSONS SET NAME='George' WHERE ID=${row.id}");
        runner.enqueue("This statement should be ignored".getBytes(), Collections.singletonMap("row.id", "1"));
        runner.run();
        conn = service.getConnection();
        try {
            stmt = conn.createStatement();
            try {
                rs = stmt.executeQuery("SELECT * FROM PERSONS");
                Assertions.assertTrue((boolean)rs.next());
                Assertions.assertEquals((int)1, (int)rs.getInt(1));
                Assertions.assertEquals((Object)"George", (Object)rs.getString(2));
                Assertions.assertEquals((int)84, (int)rs.getInt(3));
                Assertions.assertFalse((boolean)rs.next());
            }
            finally {
                if (stmt != null) {
                    stmt.close();
                }
            }
        }
        finally {
            if (conn != null) {
                conn.close();
            }
        }
    }

    @Test
    public void testTransactionalFlowFileFilter() {
        MockFlowFile ff0 = new MockFlowFile(0L);
        MockFlowFile ff1 = new MockFlowFile(1L);
        MockFlowFile ff2 = new MockFlowFile(2L);
        MockFlowFile ff3 = new MockFlowFile(3L);
        MockFlowFile ff4 = new MockFlowFile(4L);
        ff0.putAttributes(this.createFragmentedTransactionAttributes("tx-1", 3, 0));
        ff1.putAttributes(Collections.singletonMap("accept", "false"));
        ff2.putAttributes(this.createFragmentedTransactionAttributes("tx-1", 3, 1));
        ff3.putAttributes(Collections.singletonMap("accept", "true"));
        ff4.putAttributes(this.createFragmentedTransactionAttributes("tx-1", 3, 2));
        PutSQL.TransactionalFlowFileFilter txFilter = new PutSQL.TransactionalFlowFileFilter(null);
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff0));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff1));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff2));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff3));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE, (Object)txFilter.filter((FlowFile)ff4));
        txFilter = new PutSQL.TransactionalFlowFileFilter(null);
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff1));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff0));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff2));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff3));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff4));
        FlowFileFilter nonTxFilter = flowFile -> "true".equals(flowFile.getAttribute("accept")) ? FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE : FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE;
        txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff0));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff1));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff2));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff3));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_TERMINATE, (Object)txFilter.filter((FlowFile)ff4));
        txFilter = new PutSQL.TransactionalFlowFileFilter(nonTxFilter);
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff1));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff0));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff2));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.ACCEPT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff3));
        Assertions.assertEquals((Object)FlowFileFilter.FlowFileFilterResult.REJECT_AND_CONTINUE, (Object)txFilter.filter((FlowFile)ff4));
    }

    private void testFailInMiddleWithBadParameterType(TestRunner runner) throws ProcessException {
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        HashMap<String, String> goodAttributes = new HashMap<String, String>();
        goodAttributes.put("sql.args.1.type", String.valueOf(4));
        goodAttributes.put("sql.args.1.value", "84");
        HashMap<String, String> badAttributes = new HashMap<String, String>();
        badAttributes.put("sql.args.1.type", String.valueOf(12));
        badAttributes.put("sql.args.1.value", "hello");
        byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);
    }

    private void testFailInMiddleWithBadParameterValue(TestRunner runner) throws ProcessException, SQLException {
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        this.recreateTable("PERSONS_AI", createPersonsAutoId);
        HashMap<String, String> goodAttributes = new HashMap<String, String>();
        goodAttributes.put("sql.args.1.type", String.valueOf(4));
        goodAttributes.put("sql.args.1.value", "84");
        HashMap<String, String> badAttributes = new HashMap<String, String>();
        badAttributes.put("sql.args.1.type", String.valueOf(4));
        badAttributes.put("sql.args.1.value", "9999");
        byte[] data = "INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', ?)".getBytes();
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, badAttributes);
        runner.enqueue(data, goodAttributes);
        runner.enqueue(data, goodAttributes);
    }

    private void testFailInMiddleWithBadStatement(TestRunner runner) {
        runner.setProperty(PutSQL.OBTAIN_GENERATED_KEYS, "false");
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Mark', 84)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Tom', 3)".getBytes());
        runner.enqueue("INSERT INTO PERSONS_AI (NAME, CODE) VALUES ('Harry', 44)".getBytes());
    }

    private Map<String, String> createFragmentedTransactionAttributes(String id, int count, int index) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("fragment.identifier", id);
        attributes.put("fragment.count", String.valueOf(count));
        attributes.put("fragment.index", String.valueOf(index));
        return attributes;
    }

    private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
        try (Connection conn = service.getConnection();
             Statement stmt = conn.createStatement();){
            stmt.executeUpdate("drop table " + tableName);
            stmt.executeUpdate(createSQL);
        }
    }

    private String fixedSizeByteArrayAsASCIIString(int length) {
        byte[] bBinary = RandomUtils.nextBytes((int)length);
        ByteBuffer bytes = ByteBuffer.wrap(bBinary);
        StringBuilder sbBytes = new StringBuilder();
        for (int i = bytes.position(); i < bytes.limit(); ++i) {
            sbBytes.append((char)bytes.get(i));
        }
        return sbBytes.toString();
    }

    private String fixedSizeByteArrayAsHexString(int length) {
        byte[] bBinary = RandomUtils.nextBytes((int)length);
        return DatatypeConverter.printHexBinary((byte[])bBinary);
    }

    private String fixedSizeByteArrayAsBase64String(int length) {
        byte[] bBinary = RandomUtils.nextBytes((int)length);
        return DatatypeConverter.printBase64Binary((byte[])bBinary);
    }

    private TestRunner initTestRunner() throws InitializationException {
        TestRunner runner = TestRunners.newTestRunner(PutSQL.class);
        runner.addControllerService("dbcp", (ControllerService)service);
        runner.enableControllerService((ControllerService)service);
        runner.setProperty(PutSQL.CONNECTION_POOL, "dbcp");
        return runner;
    }

    private static File getEmptyDirectory() {
        String randomDirectory = String.format("%s-%s", TestPutSQL.class.getSimpleName(), UUID.randomUUID());
        return Paths.get(TestPutSQL.getSystemTemporaryDirectory(), randomDirectory).toFile();
    }

    private static void assertSQLExceptionRelatedAttributes(TestRunner runner, Relationship relationship) {
        List flowFiles = runner.getFlowFilesForRelationship(relationship);
        flowFiles.forEach(ff -> {
            ff.assertAttributeExists("error.message");
            ff.assertAttributeExists("error.code");
            ff.assertAttributeExists("error.sql.state");
        });
    }

    private static void assertNonSQLErrorRelatedAttributes(TestRunner runner, Relationship relationship) {
        List flowFiles = runner.getFlowFilesForRelationship(relationship);
        flowFiles.forEach(ff -> ff.assertAttributeExists("error.message"));
    }

    private static void assertOriginalAttributesAreKept(TestRunner runner) {
        runner.assertAllFlowFilesContainAttribute("sql.args.1.type");
        runner.assertAllFlowFilesContainAttribute("sql.args.1.value");
    }

    private static void assertErrorAttributesInTransaction(TestRunner runner, Relationship relationship) {
        List flowFiles = runner.getFlowFilesForRelationship(relationship);
        Assertions.assertEquals((long)1L, (long)flowFiles.stream().filter(TestPutSQL::errorAttributesAreSet).count(), (String)"Only one FlowFile should have the error attributes when transaction is used.");
    }

    private static void assertErrorAttributesNotSet(TestRunner runner, Relationship relationship) {
        List flowFiles = runner.getFlowFilesForRelationship(relationship);
        flowFiles.forEach(ff -> ff.assertAttributeNotExists("error.message"));
    }

    private static boolean errorAttributesAreSet(MockFlowFile ff) {
        return ff.getAttribute("error.message") != null && ff.getAttribute("error.code") != null && ff.getAttribute("error.sql.state") != null;
    }

    private static String getSystemTemporaryDirectory() {
        return System.getProperty("java.io.tmpdir");
    }

    private static class MockDBCPService
    extends AbstractControllerService
    implements DBCPService {
        private final String dbLocation;

        public MockDBCPService(String dbLocation) {
            this.dbLocation = dbLocation;
        }

        public String getIdentifier() {
            return "dbcp";
        }

        public Connection getConnection() throws ProcessException {
            try {
                Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
                Connection conn = DriverManager.getConnection("jdbc:derby:" + this.dbLocation + ";create=true");
                return conn;
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new ProcessException("getConnection failed: " + e);
            }
        }
    }

    private static class SQLExceptionService
    extends AbstractControllerService
    implements DBCPService {
        private final DBCPService service;
        private int allowedBeforeFailure = 0;
        private int successful = 0;

        public SQLExceptionService(DBCPService service) {
            this.service = service;
        }

        public String getIdentifier() {
            return "dbcp";
        }

        public Connection getConnection() throws ProcessException {
            try {
                if (++this.successful > this.allowedBeforeFailure) {
                    Connection conn = (Connection)Mockito.mock(Connection.class);
                    Mockito.when((Object)conn.prepareStatement((String)Mockito.any(String.class))).thenThrow(new Throwable[]{new SQLException("Unit Test Generated SQLException")});
                    return conn;
                }
                return this.service.getConnection();
            }
            catch (Exception e) {
                e.printStackTrace();
                throw new ProcessException("getConnection failed: " + e);
            }
        }
    }
}

