/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.security.UserGroupInformation;

public class TestRecoveryManager
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestRecoveryManager.class);
    private static final Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "test-recovery-manager");
    private FileSystem fs;
    private JobConf conf;
    private MiniMRCluster mr;

    protected void setUp() {
        JobConf conf = new JobConf();
        try {
            this.fs = FileSystem.get((Configuration)new Configuration());
            this.fs.delete(TEST_DIR, true);
            conf.set("mapred.jobtracker.job.history.block.size", "1024");
            conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
            this.mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    protected void tearDown() {
        ClusterStatus status = this.mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
        if (status.getJobTrackerState() == JobTracker.State.RUNNING) {
            this.mr.shutdown();
        }
    }

    public void testJobTrackerRestartsWithMissingJobFile() throws Exception {
        LOG.info((Object)"Testing jobtracker restart with faulty job");
        String signalFile = new Path(TEST_DIR, "signal").toString();
        JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output1"), 2, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob1 = new JobClient(job1).submitJob(job1);
        LOG.info((Object)("Submitted job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output2"), 30, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        while (rJob2.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob2.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        String sysDir = this.mr.getJobTrackerRunner().getJobTracker().getSystemDir();
        this.mr.stopJobTracker();
        Path jobFile = new Path(sysDir, rJob1.getID().toString() + "/" + "job-info");
        LOG.info((Object)("Deleting job token file : " + jobFile.toString()));
        this.fs.delete(jobFile, false);
        FSDataOutputStream out = this.fs.create(jobFile);
        out.write(1);
        out.close();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        ClusterStatus status = this.mr.getJobTrackerRunner().getJobTracker().getClusterStatus(false);
        TestRecoveryManager.assertEquals((String)"JobTracker crashed!", (Object)JobTracker.State.RUNNING, (Object)status.getJobTrackerState());
    }

    public void testJobResubmission() throws Exception {
        LOG.info((Object)"Testing Job Resubmission");
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        TestRecoveryManager.assertEquals((String)"Resubmission failed ", (int)1, (int)jobtracker.getAllJobs().length);
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        while (!jip.isComplete()) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be successful"));
            this.fs.create(new Path(TEST_DIR, "signal"));
            UtilsForTests.waitFor(100L);
        }
        TestRecoveryManager.assertTrue((String)"Task should be successful", (boolean)rJob1.isSuccessful());
    }

    public void testJobTrackerRestartWithBadJobs() throws Exception {
        LOG.info((Object)"Testing recovery-manager");
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        job1.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0, "test-recovery-manager", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        while (rJob1.mapProgress() < 0.5f) {
            LOG.info((Object)("Waiting for job " + rJob1.getID() + " to be 50% done"));
            UtilsForTests.waitFor(100L);
        }
        JobConf job2 = this.mr.createJobConf();
        String signalFile1 = new Path(TEST_DIR, "signal1").toString();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output4"), 20, 0, "test-recovery-manager", signalFile1, signalFile1);
        RunningJob rJob2 = new JobClient(job2).submitJob(job2);
        LOG.info((Object)("Submitted job " + rJob2.getID()));
        JobInProgress jip = jobtracker.getJob(rJob2.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        final JobConf job3 = this.mr.createJobConf();
        UserGroupInformation ugi3 = UserGroupInformation.createUserForTesting((String)"abc", (String[])new String[]{"users"});
        UtilsForTests.configureWaitingJobConf(job3, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output5"), 1, 0, "test-recovery-manager", signalFile, signalFile);
        RunningJob rJob3 = (RunningJob)ugi3.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

            @Override
            public RunningJob run() throws IOException {
                return new JobClient(job3).submitJob(job3);
            }
        });
        LOG.info((Object)("Submitted job " + rJob3.getID() + " with different user"));
        jip = jobtracker.getJob(rJob3.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        LOG.info((Object)"Stopping jobtracker");
        this.mr.stopJobTracker();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        this.mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
        this.mr.getJobTrackerConf().setBoolean("mapred.acls.enabled", true);
        UserGroupInformation ugi = UserGroupInformation.getLoginUser();
        this.mr.getJobTrackerConf().set(QueueManager.toFullPropertyName((String)"default", (String)QueueManager.QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
        LOG.info((Object)"Starting jobtracker");
        this.mr.startJobTracker();
        UtilsForTests.waitForJobTracker(jc);
        jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        TestRecoveryManager.assertEquals((String)"Recovery manager failed to tolerate job failures", (int)1, (int)jobtracker.getAllJobs().length);
        JobStatus status = jobtracker.getJobStatus(rJob1.getID());
        TestRecoveryManager.assertNull((String)"Faulty job should not be resubmitted", (Object)status);
        jip = jobtracker.getJob(rJob2.getID());
        TestRecoveryManager.assertFalse((String)"Job should be running", (boolean)jip.isComplete());
        status = jobtracker.getJobStatus(rJob3.getID());
        TestRecoveryManager.assertNull((String)"Job should be missing because of ACL changed", (Object)status);
    }

    public void testRestartCount() throws Exception {
        LOG.info((Object)"Testing Job Restart Count");
        String signalFile = new Path(TEST_DIR, "signal").toString();
        this.mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        JobTracker jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
        JobConf job1 = this.mr.createJobConf();
        job1.setJobPriority(JobPriority.HIGH);
        UtilsForTests.configureWaitingJobConf(job1, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 30, 0, "test-restart", signalFile, signalFile);
        JobClient jc = new JobClient(job1);
        RunningJob rJob1 = jc.submitJob(job1);
        LOG.info((Object)("Submitted first job " + rJob1.getID()));
        JobInProgress jip = jobtracker.getJob(rJob1.getID());
        while (!jip.inited()) {
            LOG.info((Object)("Waiting for job " + jip.getJobID() + " to be inited"));
            UtilsForTests.waitFor(100L);
        }
        for (int i = 1; i <= 2; ++i) {
            LOG.info((Object)("Stopping jobtracker for " + i + " time"));
            this.mr.stopJobTracker();
            LOG.info((Object)("Starting jobtracker for " + i + " time"));
            this.mr.startJobTracker();
            UtilsForTests.waitForJobTracker(jc);
            jobtracker = this.mr.getJobTrackerRunner().getJobTracker();
            TestRecoveryManager.assertEquals((String)"Recovery manager failed to recover restart count", (int)0, (int)jip.getNumRestarts());
        }
        rJob1.killJob();
        JobConf job2 = this.mr.createJobConf();
        UtilsForTests.configureWaitingJobConf(job2, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0, "test-restart-manager", signalFile, signalFile);
        RunningJob rJob2 = jc.submitJob(job2);
        LOG.info((Object)("Submitted first job after restart" + rJob2.getID()));
        jip = jobtracker.getJob(rJob2.getID());
        TestRecoveryManager.assertEquals((String)"Restart count for new job is incorrect", (int)0, (int)jip.getNumRestarts());
        LOG.info((Object)"Stopping jobtracker for testing the fs errors");
        this.mr.stopJobTracker();
        Path rFile = jobtracker.recoveryManager.getRestartCountFile();
        this.fs.delete(rFile, false);
        FSDataOutputStream out = this.fs.create(rFile);
        out.writeBoolean(true);
        out.close();
        LOG.info((Object)"Starting jobtracker with fs errors");
        this.mr.startJobTracker();
        MiniMRCluster.JobTrackerRunner runner = this.mr.getJobTrackerRunner();
        TestRecoveryManager.assertFalse((String)"JobTracker is still alive", (boolean)runner.isActive());
    }

    public void testJobTrackerInfoCreation() throws Exception {
        LOG.info((Object)"Testing jobtracker.info file");
        MiniDFSCluster dfs = new MiniDFSCluster(new Configuration(), 1, true, null);
        String namenode = dfs.getFileSystem().getUri().getHost() + ":" + dfs.getFileSystem().getUri().getPort();
        dfs.shutdownDataNodes();
        JobConf conf = new JobConf();
        FileSystem.setDefaultUri((Configuration)conf, (String)namenode);
        conf.set("mapred.job.tracker", "localhost:0");
        conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
        JobTracker jobtracker = new JobTracker(conf);
        boolean failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        TestRecoveryManager.assertTrue((String)"JobTracker created info files without datanodes!!!", (boolean)failed);
        Path restartFile = jobtracker.recoveryManager.getRestartCountFile();
        Path tmpRestartFile = jobtracker.recoveryManager.getTempRestartCountFile();
        FileSystem fs = dfs.getFileSystem();
        TestRecoveryManager.assertFalse((String)"Info file exists after update failure", (boolean)fs.exists(restartFile));
        TestRecoveryManager.assertFalse((String)"Temporary restart-file exists after update failure", (boolean)fs.exists(restartFile));
        dfs.startDataNodes((Configuration)conf, 1, true, null, null, null, null);
        dfs.waitActive();
        failed = false;
        try {
            jobtracker.recoveryManager.updateRestartCount();
        }
        catch (IOException ioe) {
            failed = true;
        }
        TestRecoveryManager.assertFalse((String)"JobTracker failed to create info files with datanodes!!!", (boolean)failed);
    }
}

