/*
 * Decompiled with CFR 0.152.
 */
package com.teradata.connector.teradata;

import com.teradata.connector.common.utils.StandardCharsets;
import com.teradata.connector.teradata.TeradataInputFormat;
import com.teradata.connector.teradata.TeradataSplitByPartitionInputFormat;
import com.teradata.connector.teradata.db.TeradataConnection;
import com.teradata.connector.teradata.processor.TeradataSplitByPartitionProcessor;
import com.teradata.connector.teradata.utils.TeradataPlugInConfiguration;
import com.teradata.connector.teradata.utils.TeradataSchemaUtils;
import java.sql.SQLException;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class TeradataSplitByPartitionInputFormatTest {
    private static String dbJdbcUrl;
    private static String dbIpAddress;
    private static String dbDatabase;
    private static String dbUsername;
    private static String dbPassword;
    private static String dbJdbcDriverClass;
    private static TeradataConnection dbConnection;
    protected static final int VALUE_PARTITION_ID_TYPE = 4;
    protected static final String COLUMN_PARTITION_ID = "TDIN_PARTID";
    protected static final String COLUMN_PARTITION = "PARTITION";
    protected static final int PARTITION_RANGE_MIN = 10;

    @BeforeClass
    public static void initializeTests() throws Exception {
        dbIpAddress = System.getProperty("test.database.ipaddress", "10.25.35.197");
        dbDatabase = System.getProperty("test.database.databasename", "tdch_testing2");
        dbUsername = System.getProperty("test.database.username", "tdch_testing2");
        dbPassword = System.getProperty("test.database.userpassword", "tdch_testing2");
        dbJdbcDriverClass = System.getProperty("test.database.jdbcdriver.class", "com.teradata.jdbc.TeraDriver");
        dbJdbcUrl = "jdbc:teradata://" + dbIpAddress + "/database=" + dbDatabase;
        dbConnection = new TeradataConnection(dbJdbcDriverClass, dbJdbcUrl, dbUsername, dbPassword, false);
        dbConnection.connect();
        TeradataSplitByPartitionInputFormatTest.createDatabaseTables();
    }

    @AfterClass
    public static void cleanupTests() throws Exception {
        TeradataSplitByPartitionInputFormatTest.dropDatabaseTables();
        dbConnection.close();
    }

    @Test
    public void testGetSplitsStagingEnabled1() throws Exception {
        boolean stagingEnabled = true;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "4");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_nopartitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 4 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 0 AND 2"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 3 AND 5"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 6 AND 7"));
                    break;
                }
                case 3: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 8 AND 10"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingEnabled2() throws Exception {
        boolean stagingEnabled = true;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "10");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_nopartitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 10 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 1"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 2"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 3"));
                    break;
                }
                case 3: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 4"));
                    break;
                }
                case 4: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 5"));
                    break;
                }
                case 5: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 6"));
                    break;
                }
                case 6: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 7"));
                    break;
                }
                case 7: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 8"));
                    break;
                }
                case 8: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 9"));
                    break;
                }
                case 9: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 10"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingEnabled3() throws Exception {
        boolean stagingEnabled = true;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "12");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_nopartitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 10 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 1"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 2"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 3"));
                    break;
                }
                case 3: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 4"));
                    break;
                }
                case 4: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 5"));
                    break;
                }
                case 5: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 6"));
                    break;
                }
                case 6: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 7"));
                    break;
                }
                case 7: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 8"));
                    break;
                }
                case 8: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 9"));
                    break;
                }
                case 9: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 10"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingEnabled4() throws Exception {
        boolean stagingEnabled = true;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "1");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_nopartitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 1 ? 1 : 0) != 0);
        TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)inputSplits.get(0);
        Assert.assertTrue((boolean)tsplit.getSplitSql().contains("SELECT \"col1\", \"col2\", \"col3\" FROM \"" + dbDatabase + "\".\"splitbypartition_table_nopartitions\""));
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingEnabled5() throws Exception {
        boolean stagingEnabled = true;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "4");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_10partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 4 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 0 AND 2"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 3 AND 5"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 6 AND 7"));
                    break;
                }
                case 3: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 8 AND 10"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingEnabled6() throws Exception {
        boolean stagingEnabled = true;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "2");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_10partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 2 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 0 AND 4"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 5 AND 10"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingEnabled7() throws Exception {
        boolean stagingEnabled = true;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "3");
        configuration.set("tdch.input.teradata.num.partitions", "32");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_nopartitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 3 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 1 AND 11"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 12 AND 21"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 22 AND 32"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingDisabled1() throws Exception {
        boolean stagingEnabled = false;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "4");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_10partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 4 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 24 AND 55"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 83 AND 127"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 133 AND 162"));
                    break;
                }
                case 3: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 170 AND 179"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingDisabled2() throws Exception {
        boolean stagingEnabled = false;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "2");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_10partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 2 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 24 AND 110"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 127 AND 179"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingDisabled3() throws Exception {
        boolean stagingEnabled = false;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "10");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_10partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 10 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 24"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 31"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 55"));
                    break;
                }
                case 3: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 83"));
                    break;
                }
                case 4: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 110"));
                    break;
                }
                case 5: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 127"));
                    break;
                }
                case 6: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 133"));
                    break;
                }
                case 7: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 162"));
                    break;
                }
                case 8: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 170"));
                    break;
                }
                case 9: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 179"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingDisabled4() throws Exception {
        boolean stagingEnabled = false;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "12");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_10partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 10 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 24"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 31"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 55"));
                    break;
                }
                case 3: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 83"));
                    break;
                }
                case 4: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 110"));
                    break;
                }
                case 5: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 127"));
                    break;
                }
                case 6: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 133"));
                    break;
                }
                case 7: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 162"));
                    break;
                }
                case 8: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 170"));
                    break;
                }
                case 9: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION = 179"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingDisabled5() throws Exception {
        boolean stagingEnabled = false;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "1");
        configuration.set("tdch.input.teradata.num.partitions", "10");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_10partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 1 ? 1 : 0) != 0);
        TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)inputSplits.get(0);
        Assert.assertTrue((boolean)tsplit.getSplitSql().contains("SELECT \"col1\", \"col2\", \"col3\" FROM \"" + dbDatabase + "\".\"splitbypartition_table_10partitions\""));
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingDisabled6() throws Exception {
        boolean stagingEnabled = false;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "3");
        configuration.set("tdch.input.teradata.num.partitions", "32");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_32partitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 3 ? 1 : 0) != 0);
        int count = 0;
        for (InputSplit split : inputSplits) {
            TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)split;
            switch (count) {
                case 0: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 12 AND 129"));
                    break;
                }
                case 1: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 130 AND 247"));
                    break;
                }
                case 2: {
                    Assert.assertTrue((boolean)tsplit.getSplitSql().contains(".PARTITION BETWEEN 248 AND 365"));
                    break;
                }
            }
            ++count;
        }
        processor.inputPostProcessor((JobContext)context);
    }

    @Test
    public void testGetSplitsStagingDisabled7() throws Exception {
        boolean stagingEnabled = false;
        TeradataSplitByPartitionProcessor processor = new TeradataSplitByPartitionProcessor();
        Job context = new Job();
        Configuration configuration = context.getConfiguration();
        configuration.set("tdch.num.mappers", "1");
        configuration.set("tdch.input.teradata.stage.table.enabled", stagingEnabled ? "true" : "false");
        TeradataPlugInConfiguration.setInputJdbcDriverClass(configuration, dbJdbcDriverClass);
        TeradataPlugInConfiguration.setInputJdbcUrl(configuration, dbJdbcUrl);
        TeradataPlugInConfiguration.setInputTable(configuration, dbDatabase + ".splitbypartition_table_nopartitions");
        TeradataPlugInConfiguration.setInputStageTableEnabled(configuration, stagingEnabled);
        TeradataPlugInConfiguration.setInputTeradataUserName((JobContext)context, dbUsername.getBytes(StandardCharsets.UTF_8));
        TeradataPlugInConfiguration.setInputTeradataPassword((JobContext)context, dbPassword.getBytes(StandardCharsets.UTF_8));
        TeradataSchemaUtils.setupTeradataSourceTableSchema(configuration, dbConnection);
        processor.inputPreProcessor((JobContext)context);
        TeradataSchemaUtils.configureSourceConnectorRecordSchema(configuration);
        TeradataSplitByPartitionInputFormat sbpInputFormat = new TeradataSplitByPartitionInputFormat();
        List<InputSplit> inputSplits = sbpInputFormat.getSplits((JobContext)context);
        Assert.assertTrue((inputSplits.size() == 1 ? 1 : 0) != 0);
        TeradataInputFormat.TeradataInputSplit tsplit = (TeradataInputFormat.TeradataInputSplit)inputSplits.get(0);
        Assert.assertTrue((boolean)tsplit.getSplitSql().contains("SELECT \"col1\", \"col2\", \"col3\" FROM \"" + dbDatabase + "\".\"splitbypartition_table_nopartitions\""));
        processor.inputPostProcessor((JobContext)context);
    }

    private static void createDatabaseTables() throws Exception {
        TeradataSplitByPartitionInputFormatTest.dropDatabaseTables();
        dbConnection.executeDDL("CREATE TABLE " + dbDatabase + ".splitbypartition_table_nodata (col1 INT, col2 INT, col3 VARCHAR(20)) PARTITION BY (col1), PRIMARY INDEX (col1);");
        dbConnection.executeDDL("CREATE TABLE " + dbDatabase + ".splitbypartition_table_nopartitions (col1 INT, col2 INT, col3 VARCHAR(20));");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (24,95,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (83,40,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (55,24,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (31,116,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (133,54,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (179,147,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (110,41,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (170,167,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (127,107,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_nopartitions VALUES (162,181,'hello world');");
        dbConnection.executeDDL("CREATE TABLE " + dbDatabase + ".splitbypartition_table_10partitions (col1 INT, col2 INT, col3 VARCHAR(20)) PARTITION BY (col1), PRIMARY INDEX (col1);");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (24,95,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (83,40,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (55,24,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (31,116,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (133,54,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (179,147,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (110,41,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (170,167,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (127,107,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_10partitions VALUES (162,181,'hello world');");
        dbConnection.executeDDL("CREATE TABLE " + dbDatabase + ".splitbypartition_table_32partitions (col1 INT, col2 INT, col3 VARCHAR(20)) PARTITION BY (col1), PRIMARY INDEX (col1);");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (24,95,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (83,40,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (55,24,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (31,116,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (133,54,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (179,147,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (110,41,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (170,167,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (127,107,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (319,88,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (162,181,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (33,200,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (72,101,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (89,26,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (101,11,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (102,76,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (122,34,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (131,298,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (144,97,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (181,43,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (201,40,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (176,35,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (207,90,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (213,81,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (287,63,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (265,204,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (365,243,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (344,222,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (307,178,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (234,333,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (12,71,'hello world');");
        dbConnection.executeDDL("INSERT INTO " + dbDatabase + ".splitbypartition_table_32partitions VALUES (45,302,'hello world');");
    }

    private static void dropDatabaseTables() throws Exception {
        try {
            dbConnection.dropTable(dbDatabase + ".splitbypartition_table_nodata");
        }
        catch (SQLException sQLException) {
            // empty catch block
        }
        try {
            dbConnection.dropTable(dbDatabase + ".splitbypartition_table_nopartitions");
        }
        catch (SQLException sQLException) {
            // empty catch block
        }
        try {
            dbConnection.dropTable(dbDatabase + ".splitbypartition_table_10partitions");
        }
        catch (SQLException sQLException) {
            // empty catch block
        }
        try {
            dbConnection.dropTable(dbDatabase + ".splitbypartition_table_32partitions");
        }
        catch (SQLException sQLException) {
            // empty catch block
        }
    }
}

