/*
 * Decompiled with CFR 0.152.
 */
package com.cloudera.sqoop;

import com.cloudera.sqoop.Sqoop;
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.manager.ConnManager;
import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.tool.CodeGenTool;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.tool.MergeTool;
import com.cloudera.sqoop.tool.SqoopTool;
import com.cloudera.sqoop.util.ClassLoaderStack;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;

public class TestMerge
extends BaseSqoopTestCase {
    private static final Log LOG = LogFactory.getLog((String)TestMerge.class.getName());
    protected ConnManager manager;
    protected Connection conn;
    public static final String SOURCE_DB_URL = "jdbc:hsqldb:mem:merge";
    public static final String TABLE_NAME = "MergeTable";

    @Override
    public void setUp() {
        super.setUp();
        this.manager = this.getManager();
        try {
            this.conn = this.manager.getConnection();
        }
        catch (SQLException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    public Configuration newConf() {
        Configuration conf = new Configuration();
        if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
            conf.set("fs.defaultfs.name", "file:///");
        }
        conf.set("mapred.job.tracker", "local");
        return conf;
    }

    @Override
    public SqoopOptions getSqoopOptions(Configuration conf) {
        SqoopOptions options = new SqoopOptions(conf);
        options.setConnectString(HsqldbTestServer.getDbUrl());
        return options;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void createTable() throws SQLException {
        PreparedStatement s = this.conn.prepareStatement("DROP TABLE \"MergeTable\" IF EXISTS");
        try {
            s.executeUpdate();
        }
        finally {
            s.close();
        }
        s = this.conn.prepareStatement("CREATE TABLE \"MergeTable\" (id INT NOT NULL PRIMARY KEY, val INT, lastmod TIMESTAMP)");
        try {
            s.executeUpdate();
        }
        finally {
            s.close();
        }
        s = this.conn.prepareStatement("INSERT INTO \"MergeTable\" VALUES (0, 0, NOW())");
        try {
            s.executeUpdate();
        }
        finally {
            s.close();
        }
        s = this.conn.prepareStatement("INSERT INTO \"MergeTable\" VALUES (1, 42, NOW())");
        try {
            s.executeUpdate();
        }
        finally {
            s.close();
        }
        this.conn.commit();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testMerge() throws Exception {
        this.createTable();
        String MERGE_CLASS_NAME = "ClassForMerging";
        SqoopOptions options = this.getSqoopOptions(this.newConf());
        options.setTableName(TABLE_NAME);
        options.setClassName("ClassForMerging");
        CodeGenTool codeGen = new CodeGenTool();
        Sqoop codeGenerator = new Sqoop((SqoopTool)codeGen, options.getConf(), options);
        int ret = Sqoop.runSqoop((Sqoop)codeGenerator, (String[])new String[0]);
        if (0 != ret) {
            TestMerge.fail((String)("Nonzero exit from codegen: " + ret));
        }
        List jars = codeGen.getGeneratedJarFiles();
        String jarFileName = (String)jars.get(0);
        Path warehouse = new Path(BaseSqoopTestCase.LOCAL_WAREHOUSE_DIR);
        options = this.getSqoopOptions(this.newConf());
        options.setTableName(TABLE_NAME);
        options.setNumMappers(1);
        options.setTargetDir(new Path(warehouse, "merge-old").toString());
        options.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified);
        options.setIncrementalTestColumn("LASTMOD");
        ImportTool importTool = new ImportTool();
        Sqoop importer = new Sqoop((SqoopTool)importTool, options.getConf(), options);
        ret = Sqoop.runSqoop((Sqoop)importer, (String[])new String[0]);
        if (0 != ret) {
            TestMerge.fail((String)("Initial import failed with exit code " + ret));
        }
        this.assertRecordStartsWith("0,0,", "merge-old");
        this.assertRecordStartsWith("1,42,", "merge-old");
        long prevImportEnd = System.currentTimeMillis();
        Thread.sleep(25L);
        PreparedStatement s = this.conn.prepareStatement("UPDATE \"MergeTable\" SET val=43, lastmod=NOW() WHERE id=1");
        try {
            s.executeUpdate();
            this.conn.commit();
        }
        finally {
            s.close();
        }
        s = this.conn.prepareStatement("INSERT INTO \"MergeTable\" VALUES (3,313,NOW())");
        try {
            s.executeUpdate();
            this.conn.commit();
        }
        finally {
            s.close();
        }
        Thread.sleep(25L);
        options = this.getSqoopOptions(this.newConf());
        options.setTableName(TABLE_NAME);
        options.setNumMappers(1);
        options.setTargetDir(new Path(warehouse, "merge-new").toString());
        options.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified);
        options.setIncrementalTestColumn("LASTMOD");
        options.setIncrementalLastValue(new Timestamp(prevImportEnd).toString());
        importTool = new ImportTool();
        importer = new Sqoop((SqoopTool)importTool, options.getConf(), options);
        ret = Sqoop.runSqoop((Sqoop)importer, (String[])new String[0]);
        if (0 != ret) {
            TestMerge.fail((String)("Second import failed with exit code " + ret));
        }
        this.assertRecordStartsWith("1,43,", "merge-new");
        this.assertRecordStartsWith("3,313,", "merge-new");
        ClassLoaderStack.addJarFile((String)jarFileName, (String)"ClassForMerging");
        options = this.getSqoopOptions(this.newConf());
        options.setMergeOldPath(new Path(warehouse, "merge-old").toString());
        options.setMergeNewPath(new Path(warehouse, "merge-new").toString());
        options.setMergeKeyCol("ID");
        options.setTargetDir(new Path(warehouse, "merge-final").toString());
        options.setClassName("ClassForMerging");
        MergeTool mergeTool = new MergeTool();
        Sqoop merger = new Sqoop((SqoopTool)mergeTool, options.getConf(), options);
        ret = Sqoop.runSqoop((Sqoop)merger, (String[])new String[0]);
        if (0 != ret) {
            TestMerge.fail((String)("Merge failed with exit code " + ret));
        }
        this.assertRecordStartsWith("0,0,", "merge-final");
        this.assertRecordStartsWith("1,43,", "merge-final");
        this.assertRecordStartsWith("3,313,", "merge-final");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean checkFileForLine(FileSystem fs, Path p, String prefix) throws IOException {
        BufferedReader r = new BufferedReader(new InputStreamReader((InputStream)fs.open(p)));
        try {
            String in;
            while (null != (in = r.readLine())) {
                if (!in.startsWith(prefix)) continue;
                boolean bl = true;
                return bl;
            }
        }
        finally {
            r.close();
        }
        return false;
    }

    protected boolean recordStartsWith(String prefix, String dirName) throws Exception {
        Path warehousePath = new Path(LOCAL_WAREHOUSE_DIR);
        Path targetPath = new Path(warehousePath, dirName);
        LocalFileSystem fs = FileSystem.getLocal((Configuration)new Configuration());
        FileStatus[] files = fs.listStatus(targetPath);
        if (null == files || files.length == 0) {
            TestMerge.fail((String)"Got no import files!");
        }
        for (FileStatus stat : files) {
            Path p = stat.getPath();
            if (!p.getName().startsWith("part-") || !this.checkFileForLine((FileSystem)fs, p, prefix)) continue;
            return true;
        }
        return false;
    }

    protected void assertRecordStartsWith(String prefix, String dirName) throws Exception {
        if (!this.recordStartsWith(prefix, dirName)) {
            TestMerge.fail((String)("No record found that starts with " + prefix + " in " + dirName));
        }
    }
}

