/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskInProgress;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

public class TestKillSubProcesses
extends TestCase {
    private static volatile Log LOG = LogFactory.getLog(TestKillSubProcesses.class);
    private static String BASE_TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
    private static String TEST_ROOT_DIR = BASE_TEST_ROOT_DIR + "/" + "killSubProcesses";
    private static Path scriptDir = new Path(TEST_ROOT_DIR, "script");
    private static String scriptDirName = scriptDir.toUri().getPath();
    private static Path signalFile = new Path(TEST_ROOT_DIR + "/script/signalFile");
    private static JobClient jobClient = null;
    static MiniMRCluster mr = null;
    private static String pid = null;
    private static int numLevelsOfSubProcesses = 4;

    private static void runKillingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
        conf.setJobName("testkilljobsubprocesses");
        conf.setMapperClass(KillingMapperWithChildren.class);
        RunningJob job = TestKillSubProcesses.runJobAndSetProcessHandle(jt, conf);
        job.killJob();
        while (job.cleanupProgress() == 0.0f) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException ie) {
                LOG.warn((Object)("sleep is interrupted:" + ie));
                break;
            }
        }
        TestKillSubProcesses.validateKillingSubprocesses(job, conf);
        TestKillSubProcesses.assertEquals((int)job.getJobState(), (int)5);
    }

    private static void runFailingJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
        conf.setJobName("testfailjobsubprocesses");
        conf.setMapperClass(FailingMapperWithChildren.class);
        conf.setMaxMapAttempts(1);
        RunningJob job = TestKillSubProcesses.runJobAndSetProcessHandle(jt, conf);
        TestKillSubProcesses.signalTask(signalFile.toString(), conf);
        TestKillSubProcesses.validateKillingSubprocesses(job, conf);
        TestKillSubProcesses.assertEquals((int)job.getJobState(), (int)3);
    }

    private static void runSuccessfulJobAndValidate(JobTracker jt, JobConf conf) throws IOException {
        conf.setJobName("testsucceedjobsubprocesses");
        conf.setMapperClass(MapperWithChildren.class);
        RunningJob job = TestKillSubProcesses.runJobAndSetProcessHandle(jt, conf);
        TestKillSubProcesses.signalTask(signalFile.toString(), conf);
        TestKillSubProcesses.validateKillingSubprocesses(job, conf);
        TestKillSubProcesses.assertEquals((int)job.getJobState(), (int)2);
    }

    private static RunningJob runJobAndSetProcessHandle(JobTracker jt, JobConf conf) throws IOException {
        RunningJob job = TestKillSubProcesses.runJob(conf);
        while (job.getJobState() != 1) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        pid = null;
        jobClient = new JobClient(conf);
        TaskReport[] mapReports = jobClient.getMapTaskReports(job.getID());
        JobInProgress jip = jt.getJob(job.getID());
        for (TaskReport tr : mapReports) {
            TaskInProgress tip = jip.getTaskInProgress(tr.getTaskID());
            while (tip.getActiveTasks().size() == 0) {
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ie) {
                    LOG.warn((Object)("sleep is interrupted:" + ie));
                    break;
                }
            }
            for (TaskAttemptID id : tip.getActiveTasks().keySet()) {
                LOG.info((Object)("taskAttemptID of map task is " + id));
                while (pid == null) {
                    pid = mr.getTaskTrackerRunner(0).getTaskTracker().getPid(id);
                    if (pid != null) continue;
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException interruptedException) {}
                }
                LOG.info((Object)("pid of map task is " + pid));
                TestKillSubProcesses.assertTrue((String)"Map is no more alive", (boolean)TestKillSubProcesses.isAlive(pid));
                LOG.info((Object)"The map task is alive before Job completion, as expected.");
            }
        }
        if (ProcessTree.isSetsidAvailable) {
            String childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + 0);
            while (childPid == null) {
                LOG.warn((Object)(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping..."));
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ie) {
                    LOG.warn((Object)("sleep is interrupted:" + ie));
                    break;
                }
                childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + 0);
            }
            for (int i = 0; i <= numLevelsOfSubProcesses; ++i) {
                childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + i);
                LOG.info((Object)("pid of the descendant process at level " + i + "in the subtree of processes(with the map task as the root)" + " is " + childPid));
                TestKillSubProcesses.assertTrue((String)("Unexpected: The subprocess at level " + i + " in the subtree is not alive before Job completion"), (boolean)TestKillSubProcesses.isAlive(childPid));
            }
        }
        return job;
    }

    private static void validateKillingSubprocesses(RunningJob job, JobConf conf) throws IOException {
        LocalFileSystem fs;
        while (!job.isComplete()) {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException e) {
                // empty catch block
                break;
            }
        }
        TestKillSubProcesses.assertTrue((!ProcessTree.isAlive((String)pid) ? 1 : 0) != 0);
        LOG.info((Object)"The map task is not alive after Job is completed, as expected.");
        if (ProcessTree.isSetsidAvailable) {
            for (int i = 0; i <= numLevelsOfSubProcesses; ++i) {
                String childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + i);
                LOG.info((Object)("pid of the descendant process at level " + i + "in the subtree of processes(with the map task as the root)" + " is " + childPid));
                TestKillSubProcesses.assertTrue((String)("Unexpected: The subprocess at level " + i + " in the subtree is alive after Job completion"), (!TestKillSubProcesses.isAlive(childPid) ? 1 : 0) != 0);
            }
        }
        if ((fs = FileSystem.getLocal((Configuration)mr.createJobConf())).exists(scriptDir)) {
            fs.delete(scriptDir, true);
        }
    }

    private static RunningJob runJob(JobConf conf) throws IOException {
        Path outDir;
        Path inDir;
        LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
        FileSystem tempFs = FileSystem.get((Configuration)conf);
        if (!tempFs.getUri().toASCIIString().equals(fs.getUri().toASCIIString())) {
            inDir = new Path("killjob/input");
            outDir = new Path("killjob/output");
        } else {
            inDir = new Path(TEST_ROOT_DIR, "input");
            outDir = new Path(TEST_ROOT_DIR, "output");
        }
        if (fs.exists(scriptDir)) {
            fs.delete(scriptDir, true);
        }
        conf.setNumMapTasks(1);
        conf.setNumReduceTasks(0);
        conf.set("mapred.map.child.java.opts", conf.get("mapred.map.child.java.opts", conf.get("mapred.child.java.opts")) + " -Dtest.build.data=" + BASE_TEST_ROOT_DIR);
        conf.set("mapred.reduce.child.java.opts", conf.get("mapred.reduce.child.java.opts", conf.get("mapred.child.java.opts")) + " -Dtest.build.data=" + BASE_TEST_ROOT_DIR);
        return UtilsForTests.runJob(conf, inDir, outDir);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testJobKillFailAndSucceed() throws IOException {
        if (Shell.WINDOWS) {
            System.out.println("setsid doesn't work on WINDOWS as expected. Not testing");
            return;
        }
        try {
            JobConf conf = new JobConf();
            conf.setLong("mapred.tasktracker.tasks.sleeptime-before-sigkill", 0L);
            mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
            conf = mr.createJobConf();
            JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
            this.runTests(conf, jt);
        }
        finally {
            if (mr != null) {
                mr.shutdown();
            }
        }
    }

    void runTests(JobConf conf, JobTracker jt) throws IOException {
        Path rootDir;
        LocalFileSystem fs = FileSystem.getLocal((Configuration)mr.createJobConf());
        if (!fs.exists(rootDir = new Path(TEST_ROOT_DIR))) {
            fs.mkdirs(rootDir);
        }
        fs.setPermission(rootDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
        TestKillSubProcesses.runKillingJobAndValidate(jt, conf);
        TestKillSubProcesses.runFailingJobAndValidate(jt, conf);
        TestKillSubProcesses.runSuccessfulJobAndValidate(jt, conf);
    }

    private static void signalTask(String signalFile, JobConf conf) {
        try {
            LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
            fs.createNewFile(new Path(signalFile));
        }
        catch (IOException e) {
            LOG.warn((Object)("Unable to create signal file. " + e));
        }
    }

    private static void runChildren(JobConf conf) throws IOException {
        if (ProcessTree.isSetsidAvailable) {
            LocalFileSystem fs = FileSystem.getLocal((Configuration)conf);
            if (fs.exists(scriptDir)) {
                fs.delete(scriptDir, true);
            }
            fs.mkdirs(scriptDir);
            fs.setPermission(scriptDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
            Random rm = new Random();
            Path scriptPath = new Path(scriptDirName, "_shellScript_" + rm.nextInt() + ".sh");
            String shellScript = scriptPath.toString();
            String script = "umask 000\necho $$ > " + scriptDirName + "/childPidFile" + "$1\n" + "echo hello\n" + "trap 'echo got SIGTERM' 15 \n" + "if [ $1 != 0 ]\nthen\n" + " sh " + shellScript + " $(($1-1))\n" + "else\n" + " while true\n do\n" + "  sleep 2\n" + " done\n" + "fi";
            FSDataOutputStream file = fs.create(scriptPath);
            file.writeBytes(script);
            file.close();
            new File(scriptPath.toUri().getPath()).setExecutable(true);
            LOG.info((Object)("Calling script from map task : " + shellScript));
            Runtime.getRuntime().exec(shellScript + " " + numLevelsOfSubProcesses);
            String childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + 0);
            while (childPid == null) {
                LOG.warn((Object)(scriptDirName + "/childPidFile" + 0 + " is null; Sleeping..."));
                try {
                    Thread.sleep(500L);
                }
                catch (InterruptedException ie) {
                    LOG.warn((Object)("sleep is interrupted:" + ie));
                    break;
                }
                childPid = UtilsForTests.getPidFromPidFile(scriptDirName + "/childPidFile" + 0);
            }
        }
    }

    private static boolean isAlive(String pid) throws IOException {
        String commandString = "ps -o pid,command -e";
        String[] args = new String[]{"bash", "-c", commandString};
        Shell.ShellCommandExecutor shExec = new Shell.ShellCommandExecutor(args);
        try {
            shExec.execute();
        }
        catch (Shell.ExitCodeException e) {
            return false;
        }
        catch (IOException e) {
            LOG.warn((Object)("IOExecption thrown while checking if process is alive" + StringUtils.stringifyException((Throwable)e)));
            throw e;
        }
        String output = shExec.getOutput();
        StringTokenizer strTok = new StringTokenizer(output, "\n");
        boolean found = false;
        while (strTok.hasMoreTokens()) {
            StringTokenizer pidToken = new StringTokenizer(strTok.nextToken(), " ");
            String pidStr = pidToken.nextToken();
            String commandStr = pidToken.nextToken();
            if (!pid.equals(pidStr) || commandStr.contains("ps") || commandStr.contains("grep")) continue;
            found = true;
            break;
        }
        return found;
    }

    static class FailingMapperWithChildren
    extends MapperWithChildren {
        FailingMapperWithChildren() {
        }

        @Override
        public void configure(JobConf conf) {
            super.configure(conf);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void map(WritableComparable key, Writable value, OutputCollector<WritableComparable, Writable> out, Reporter reporter) throws IOException {
            while (!this.fs.exists(signalFile)) {
                try {
                    reporter.progress();
                    FailingMapperWithChildren failingMapperWithChildren = this;
                    synchronized (failingMapperWithChildren) {
                        ((Object)((Object)this)).wait(1000L);
                    }
                }
                catch (InterruptedException ie) {
                    System.out.println("Interrupted while the map was waiting for  the signal.");
                    break;
                }
            }
            throw new RuntimeException("failing map");
        }
    }

    static class KillingMapperWithChildren
    extends MapperWithChildren {
        KillingMapperWithChildren() {
        }

        @Override
        public void configure(JobConf conf) {
            super.configure(conf);
        }

        @Override
        public void map(WritableComparable key, Writable value, OutputCollector<WritableComparable, Writable> out, Reporter reporter) throws IOException {
            try {
                while (true) {
                    Thread.sleep(1000L);
                }
            }
            catch (InterruptedException e) {
                LOG.warn((Object)("Exception in KillMapperWithChild.map:" + e));
                return;
            }
        }
    }

    static class MapperWithChildren
    extends MapReduceBase
    implements Mapper<WritableComparable, Writable, WritableComparable, Writable> {
        FileSystem fs = null;

        MapperWithChildren() {
        }

        public void configure(JobConf conf) {
            try {
                this.fs = FileSystem.getLocal((Configuration)conf);
                TestKillSubProcesses.runChildren(conf);
            }
            catch (Exception e) {
                LOG.warn((Object)("Exception in configure: " + StringUtils.stringifyException((Throwable)e)));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void map(WritableComparable key, Writable value, OutputCollector<WritableComparable, Writable> out, Reporter reporter) throws IOException {
            while (!this.fs.exists(signalFile)) {
                try {
                    reporter.progress();
                    MapperWithChildren mapperWithChildren = this;
                    synchronized (mapperWithChildren) {
                        ((Object)((Object)this)).wait(1000L);
                    }
                }
                catch (InterruptedException ie) {
                    System.out.println("Interrupted while the map was waiting for  the signal.");
                    break;
                }
            }
        }
    }
}

