/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobHistory;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.QueueManager;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.TestEmptyJob;
import org.apache.hadoop.mapred.TestJobHistory;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Ignore;

@Ignore
public class TestJobTrackerRestart
extends TestCase {
    static final Path testDir = new Path(System.getProperty("test.build.data", "/tmp"), "jt-restart-testing");
    final Path inDir = new Path(testDir, "input");
    static final Path shareDir = new Path(testDir, "share");
    final Path outputDir = new Path(testDir, "output");
    private static int numJobsSubmitted = 0;

    private static JobConf[] getJobs(JobConf conf, JobPriority[] priorities, int[] numMaps, int[] numReds, Path outputDir, Path inDir, String mapSignalFile, String reduceSignalFile) throws IOException {
        JobConf[] jobs = new JobConf[priorities.length];
        for (int i = 0; i < jobs.length; ++i) {
            jobs[i] = new JobConf((Configuration)conf);
            Path newOutputDir = outputDir.suffix(String.valueOf(numJobsSubmitted++));
            UtilsForTests.configureWaitingJobConf(jobs[i], inDir, newOutputDir, numMaps[i], numReds[i], "jt restart test job", mapSignalFile, reduceSignalFile);
            jobs[i].setJobPriority(priorities[i]);
        }
        return jobs;
    }

    private static void cleanUp(FileSystem fileSys, Path dir) throws IOException {
        fileSys.delete(new Path(TestJobTrackerRestart.getMapSignalFile(dir)), false);
        fileSys.delete(new Path(TestJobTrackerRestart.getReduceSignalFile(dir)), false);
    }

    public void testRestartWithoutRecovery(MiniDFSCluster dfs, MiniMRCluster mr) throws IOException {
        DistributedFileSystem fileSys = dfs.getFileSystem();
        TestJobTrackerRestart.cleanUp((FileSystem)fileSys, shareDir);
        JobConf newConf = TestJobTrackerRestart.getJobs(mr.createJobConf(), new JobPriority[]{JobPriority.NORMAL}, new int[]{2}, new int[]{0}, this.outputDir, this.inDir, TestJobTrackerRestart.getMapSignalFile(shareDir), TestJobTrackerRestart.getReduceSignalFile(shareDir))[0];
        JobClient jobClient = new JobClient(newConf);
        RunningJob job = jobClient.submitJob(newConf);
        JobID id = job.getID();
        while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
            UtilsForTests.waitFor(100L);
        }
        mr.stopJobTracker();
        mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", false);
        UtilsForTests.waitFor(60000L);
        mr.startJobTracker();
        UtilsForTests.signalTasks(dfs, (FileSystem)fileSys, true, TestJobTrackerRestart.getMapSignalFile(shareDir), TestJobTrackerRestart.getReduceSignalFile(shareDir));
        UtilsForTests.waitForJobTracker(jobClient);
        UtilsForTests.waitTillDone(jobClient);
        TestJobTrackerRestart.assertTrue((String)"Submitted job was detected with recovery disabled", (UtilsForTests.getJobStatus(jobClient, id) == null ? 1 : 0) != 0);
    }

    public void testTaskEventsAndReportsWithRecovery(MiniDFSCluster dfs, MiniMRCluster mr) throws IOException {
        TaskCompletionEvent[] trackerEvents;
        TaskCompletionEvent[] trackerEvents2;
        DistributedFileSystem fileSys = dfs.getFileSystem();
        int numMaps = 50;
        boolean numReducers = true;
        TestJobTrackerRestart.cleanUp((FileSystem)fileSys, shareDir);
        JobConf newConf = TestJobTrackerRestart.getJobs(mr.createJobConf(), new JobPriority[]{JobPriority.NORMAL}, new int[]{50}, new int[]{1}, this.outputDir, this.inDir, TestJobTrackerRestart.getMapSignalFile(shareDir), TestJobTrackerRestart.getReduceSignalFile(shareDir))[0];
        JobClient jobClient = new JobClient(newConf);
        RunningJob job = jobClient.submitJob(newConf);
        JobID id = job.getID();
        mr.setJobPriority(id, JobPriority.HIGH);
        mr.initializeJob(id);
        while (jobClient.getClusterStatus().getReduceTasks() == 0) {
            UtilsForTests.waitFor(100L);
        }
        while ((trackerEvents2 = mr.getMapTaskCompletionEventsUpdates(0, id, 50).getMapTaskCompletionEvents()).length < 25) {
            UtilsForTests.waitFor(1000L);
        }
        TaskCompletionEvent[] prevEvents = mr.getTaskCompletionEvents(id, 0, 50);
        TaskReport[] prevSetupReports = jobClient.getSetupTaskReports(id);
        TaskReport[] prevMapReports = jobClient.getMapTaskReports(id);
        ClusterStatus prevStatus = jobClient.getClusterStatus();
        mr.stopJobTracker();
        mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        UtilsForTests.waitFor(60000L);
        mr.startJobTracker();
        UtilsForTests.signalTasks(dfs, (FileSystem)fileSys, true, TestJobTrackerRestart.getMapSignalFile(shareDir), TestJobTrackerRestart.getReduceSignalFile(shareDir));
        UtilsForTests.waitForJobTracker(jobClient);
        int numToMatch = mr.getNumEventsRecovered() / 2;
        while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 1.0f) {
            UtilsForTests.waitFor(100L);
        }
        TaskCompletionEvent[] jtEvents = mr.getTaskCompletionEvents(id, 0, 100);
        this.testTaskCompletionEvents(prevEvents, jtEvents, false, numToMatch);
        TaskReport[] afterMapReports = jobClient.getMapTaskReports(id);
        TaskReport[] afterSetupReports = jobClient.getSetupTaskReports(id);
        this.testTaskReports(prevMapReports, afterMapReports, numToMatch - 1);
        this.testTaskReports(prevSetupReports, afterSetupReports, 1);
        TestJobTrackerRestart.assertEquals((String)"Job priority change is not reflected", (Object)JobPriority.HIGH, (Object)mr.getJobPriority(id));
        ArrayList<TaskCompletionEvent> jtMapEvents = new ArrayList<TaskCompletionEvent>();
        for (TaskCompletionEvent tce : jtEvents) {
            if (!tce.isMapTask()) continue;
            jtMapEvents.add(tce);
        }
        while ((trackerEvents = mr.getMapTaskCompletionEventsUpdates(0, id, jtMapEvents.size()).getMapTaskCompletionEvents()).length < jtMapEvents.size()) {
            UtilsForTests.waitFor(1000L);
        }
        UtilsForTests.signalTasks(dfs, (FileSystem)fileSys, false, TestJobTrackerRestart.getMapSignalFile(shareDir), TestJobTrackerRestart.getReduceSignalFile(shareDir));
        UtilsForTests.waitTillDone(jobClient);
        this.testTaskCompletionEvents(jtMapEvents.toArray(new TaskCompletionEvent[0]), trackerEvents, true, -1);
        TestJobHistory.validateJobHistoryFileFormat(id, newConf, "SUCCESS", true);
        TestJobHistory.validateJobHistoryFileContent(mr, job, newConf);
        ClusterStatus status = jobClient.getClusterStatus();
        TestJobTrackerRestart.assertTrue((String)"Cluster status is insane", (boolean)this.checkClusterStatusOnCompletion(status, prevStatus));
    }

    private void testTaskReports(TaskReport[] source, TaskReport[] target, int numToMatch) {
        for (int i = 0; i < numToMatch; ++i) {
            TestJobTrackerRestart.assertTrue((String)"Task reports for same attempt has changed", (boolean)source[i].equals((Object)target[i]));
        }
    }

    private void testTaskCompletionEvents(TaskCompletionEvent[] source, TaskCompletionEvent[] target, boolean fullMatch, int numToMatch) {
        if (fullMatch) {
            TestJobTrackerRestart.assertEquals((String)"Map task completion events mismatch", (int)source.length, (int)target.length);
            numToMatch = source.length;
        }
        for (int i = 0; i < numToMatch; ++i) {
            if (!source[i].getTaskAttemptId().equals((Object)target[i].getTaskAttemptId())) continue;
            TestJobTrackerRestart.assertTrue((String)"Map task completion events ordering mismatch", (boolean)source[i].equals((Object)target[i]));
        }
    }

    private boolean checkClusterStatusOnCompletion(ClusterStatus status, ClusterStatus prevStatus) {
        return status.getJobTrackerState() == prevStatus.getJobTrackerState() && status.getMapTasks() == 0 && status.getReduceTasks() == 0;
    }

    public void testJobRecoveryWithEmptyHistory(MiniDFSCluster dfs, MiniMRCluster mr) throws IOException {
        mr.startTaskTracker(null, null, 1, 1);
        DistributedFileSystem fileSys = dfs.getFileSystem();
        TestJobTrackerRestart.cleanUp((FileSystem)fileSys, shareDir);
        TestJobTrackerRestart.cleanUp((FileSystem)fileSys, this.inDir);
        TestJobTrackerRestart.cleanUp((FileSystem)fileSys, this.outputDir);
        JobConf conf = mr.createJobConf();
        conf.setNumReduceTasks(0);
        conf.setOutputCommitter(TestEmptyJob.CommitterWithDelayCleanup.class);
        fileSys.delete(this.outputDir, false);
        RunningJob job1 = UtilsForTests.runJob(conf, this.inDir, this.outputDir, 30, 0);
        conf.setNumReduceTasks(0);
        conf.setOutputCommitter(CommitterWithDelaySetup.class);
        Path inDir2 = new Path(testDir, "input2");
        fileSys.mkdirs(inDir2);
        Path outDir2 = new Path(testDir, "output2");
        fileSys.delete(outDir2, false);
        JobConf newConf = TestJobTrackerRestart.getJobs(mr.createJobConf(), new JobPriority[]{JobPriority.NORMAL}, new int[]{10}, new int[]{0}, outDir2, inDir2, TestJobTrackerRestart.getMapSignalFile(shareDir), TestJobTrackerRestart.getReduceSignalFile(shareDir))[0];
        JobClient jobClient = new JobClient(newConf);
        RunningJob job2 = jobClient.submitJob(newConf);
        JobID id = job2.getID();
        JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
        mr.getJobTrackerRunner().getJobTracker().initJob(jip);
        String history = JobHistory.JobInfo.getJobHistoryFileName((JobConf)jip.getJobConf(), (JobID)id);
        Path historyPath = JobHistory.JobInfo.getJobHistoryLogLocation((String)history);
        String[] parts = history.split("_");
        String jobUniqueString = parts[0] + "_" + parts[1] + "_" + id;
        Path confPath = new Path(historyPath.getParent(), jobUniqueString + "_conf.xml");
        while (jip.runningMaps() == 0) {
            UtilsForTests.waitFor(100L);
        }
        id = job1.getID();
        jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
        mr.getJobTrackerRunner().getJobTracker().initJob(jip);
        while (!jip.isCleanupLaunched()) {
            UtilsForTests.waitFor(100L);
        }
        mr.stopJobTracker();
        FileSystem historyFS = historyPath.getFileSystem((Configuration)conf);
        historyFS.delete(historyPath, false);
        historyFS.create(historyPath).close();
        UtilsForTests.signalTasks(dfs, (FileSystem)fileSys, TestJobTrackerRestart.getMapSignalFile(shareDir), TestJobTrackerRestart.getReduceSignalFile(shareDir), 1);
        mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover", true);
        mr.startJobTracker();
        job1.waitForCompletion();
        job2.waitForCompletion();
        TestJobTrackerRestart.assertFalse((String)"Old jobhistory file is not deleted", (boolean)historyFS.exists(historyPath));
        TestJobTrackerRestart.assertFalse((String)"Old jobconf file is not deleted", (boolean)historyFS.exists(confPath));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testJobTrackerRestart() throws IOException {
        String namenode = null;
        MiniDFSCluster dfs = null;
        MiniMRCluster mr = null;
        DistributedFileSystem fileSys = null;
        try {
            Configuration conf = new Configuration();
            conf.setBoolean("dfs.replication.considerLoad", false);
            dfs = new MiniDFSCluster(conf, 1, true, null, null);
            dfs.waitActive();
            fileSys = dfs.getFileSystem();
            fileSys.delete(testDir, true);
            if (!fileSys.mkdirs(this.inDir)) {
                throw new IOException("Mkdirs failed to create " + this.inDir.toString());
            }
            UtilsForTests.writeFile(dfs.getNameNode(), conf, new Path(this.inDir + "/file"), (short)1);
            dfs.startDataNodes(conf, 1, true, null, null, null, null);
            dfs.waitActive();
            namenode = dfs.getFileSystem().getUri().getHost() + ":" + dfs.getFileSystem().getUri().getPort();
            JobConf jtConf = new JobConf();
            jtConf.set("mapred.jobtracker.job.history.block.size", "1024");
            jtConf.set("mapred.jobtracker.job.history.buffer.size", "1024");
            jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
            jtConf.setLong("mapred.tasktracker.expiry.interval", 25000L);
            jtConf.setBoolean("mapred.acls.enabled", true);
            UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
            jtConf.set(QueueManager.toFullPropertyName((String)"default", (String)QueueManager.QueueACL.SUBMIT_JOB.getAclName()), ugi.getUserName());
            mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
            this.testTaskEventsAndReportsWithRecovery(dfs, mr);
            this.testRestartWithoutRecovery(dfs, mr);
            this.testJobRecoveryWithEmptyHistory(dfs, mr);
        }
        finally {
            if (mr != null) {
                try {
                    mr.shutdown();
                }
                catch (Exception e) {}
            }
            if (dfs != null) {
                try {
                    dfs.shutdown();
                }
                catch (Exception e) {}
            }
        }
    }

    private static String getMapSignalFile(Path dir) {
        return new Path(dir, "jt-restart-map-signal").toString();
    }

    private static String getReduceSignalFile(Path dir) {
        return new Path(dir, "jt-restart-reduce-signal").toString();
    }

    public static void main(String[] args) throws IOException {
        new TestJobTrackerRestart().testJobTrackerRestart();
    }

    static class CommitterWithDelaySetup
    extends FileOutputCommitter {
        CommitterWithDelaySetup() {
        }

        public void setupJob(JobContext context) throws IOException {
            FileSystem fs = FileSystem.get((Configuration)context.getConfiguration());
            while (!fs.exists(shareDir)) {
                UtilsForTests.waitFor(100L);
            }
            super.cleanupJob(context);
        }
    }
}

