/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniHDFSCluster;
import org.apache.hadoop.mapred.JobChangeEvent;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobInProgressListener;
import org.apache.hadoop.mapred.JobQueueJobInProgressListener;
import org.apache.hadoop.mapred.JobQueueTaskScheduler;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobStatusChangeEvent;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskScheduler;
import org.apache.hadoop.mapred.UtilsForTests;

public class TestJobInProgressListener
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestJobInProgressListener.class);
    private final Path testDir = new Path("test-jip-listener-update");
    private static String TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(' ', '+');

    private JobConf configureJob(JobConf conf, int m, int r, Path inDir, Path outputDir, String mapSignalFile, String redSignalFile) throws IOException {
        UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, m, r, "job-listener-test", mapSignalFile, redSignalFile);
        return conf;
    }

    public void testJobQueueChanges() throws IOException {
        LOG.info((Object)"Testing job queue changes");
        JobConf conf = new JobConf();
        MiniHDFSCluster dfs = new MiniHDFSCluster((Configuration)conf, 1, true, null, null);
        dfs.waitActive();
        DistributedFileSystem fileSys = dfs.getFileSystem();
        dfs.startDataNodes((Configuration)conf, 1, true, null, null, null, null);
        dfs.waitActive();
        String namenode = dfs.getFileSystem().getUri().getHost() + ":" + dfs.getFileSystem().getUri().getPort();
        MiniMRCluster mr = new MiniMRCluster(1, namenode, 1);
        JobClient jobClient = new JobClient(mr.createJobConf());
        fileSys.delete(this.testDir, true);
        if (!fileSys.mkdirs(this.testDir)) {
            throw new IOException("Mkdirs failed to create " + this.testDir.toString());
        }
        Path inDir = new Path(this.testDir, "input");
        Path shareDir = new Path(this.testDir, "share");
        String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
        String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
        UtilsForTests.writeFile(dfs.getNameNode(), (Configuration)conf, new Path(inDir + "/file"), (short)1);
        JobQueueJobInProgressListener myListener = new JobQueueJobInProgressListener();
        mr.getJobTrackerRunner().getJobTracker().addJobInProgressListener((JobInProgressListener)myListener);
        Path outputDir = new Path(this.testDir, "output");
        Path newOutputDir = outputDir.suffix("0");
        JobConf job1 = this.configureJob(mr.createJobConf(), 10, 0, inDir, newOutputDir, mapSignalFile, redSignalFile);
        newOutputDir = outputDir.suffix("1");
        JobConf job2 = this.configureJob(mr.createJobConf(), 1, 0, inDir, newOutputDir, mapSignalFile, redSignalFile);
        RunningJob rJob1 = jobClient.submitJob(job1);
        LOG.info((Object)("Running job " + rJob1.getID().toString()));
        RunningJob rJob2 = jobClient.submitJob(job2);
        LOG.info((Object)("Running job " + rJob2.getID().toString()));
        LOG.info((Object)"Testing job priority changes");
        LOG.info((Object)"Increasing job2's priority to HIGH");
        rJob2.setJobPriority("HIGH");
        TestJobInProgressListener.assertTrue((String)"Priority change garbles the queue", (myListener.getJobQueue().size() == 2 ? 1 : 0) != 0);
        JobInProgress[] queue = myListener.getJobQueue().toArray(new JobInProgress[0]);
        TestJobInProgressListener.assertTrue((String)"Priority change failed to bump up job2 in the queue", (boolean)queue[0].getJobID().equals((Object)rJob2.getID()));
        TestJobInProgressListener.assertTrue((String)"Priority change failed to bump down job1 in the queue", (boolean)queue[1].getJobID().equals((Object)rJob1.getID()));
        TestJobInProgressListener.assertEquals((String)"Priority change has garbled the queue", (int)2, (int)queue.length);
        LOG.info((Object)"Testing job start-time changes");
        LOG.info((Object)"Increasing job2's priority to NORMAL");
        rJob2.setJobPriority("NORMAL");
        JobInProgress jip2 = mr.getJobTrackerRunner().getJobTracker().getJob(rJob2.getID());
        JobInProgress jip1 = mr.getJobTrackerRunner().getJobTracker().getJob(rJob1.getID());
        JobStatus prevStatus = (JobStatus)jip2.getStatus().clone();
        jip2.startTime = jip1.startTime - 1L;
        jip2.status.setStartTime(jip2.startTime);
        JobStatus newStatus = (JobStatus)jip2.getStatus().clone();
        LOG.info((Object)"Updating the listener about job2's start-time change");
        JobStatusChangeEvent event = new JobStatusChangeEvent(jip2, JobStatusChangeEvent.EventType.START_TIME_CHANGED, prevStatus, newStatus);
        myListener.jobUpdated((JobChangeEvent)event);
        TestJobInProgressListener.assertTrue((String)"Start time change garbles the queue", (myListener.getJobQueue().size() == 2 ? 1 : 0) != 0);
        queue = myListener.getJobQueue().toArray(new JobInProgress[0]);
        TestJobInProgressListener.assertTrue((String)"Start time change failed to bump up job2 in the queue", (boolean)queue[0].getJobID().equals((Object)rJob2.getID()));
        TestJobInProgressListener.assertTrue((String)"Start time change failed to bump down job1 in the queue", (boolean)queue[1].getJobID().equals((Object)rJob1.getID()));
        TestJobInProgressListener.assertEquals((String)"Start time change has garbled the queue", (int)2, (int)queue.length);
        UtilsForTests.signalTasks(dfs, (FileSystem)fileSys, true, mapSignalFile, redSignalFile);
        while (rJob2.getJobState() != 2) {
            UtilsForTests.waitFor(10L);
        }
        while (rJob1.getJobState() != 2) {
            UtilsForTests.waitFor(10L);
        }
        TestJobInProgressListener.assertTrue((String)"Job completion garbles the queue", (myListener.getJobQueue().size() == 0 ? 1 : 0) != 0);
    }

    public void testJobFailure() throws Exception {
        LOG.info((Object)"Testing job-success");
        MyListener myListener = new MyListener();
        MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
        JobConf job = mr.createJobConf();
        mr.getJobTrackerRunner().getJobTracker().addJobInProgressListener((JobInProgressListener)myListener);
        Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/input");
        Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerfailjob/output");
        RunningJob rJob = UtilsForTests.runJobFail(job, inDir, outDir);
        JobID id = rJob.getID();
        TestJobInProgressListener.assertFalse((String)"Missing event notification on failing a running job", (boolean)myListener.contains(id));
    }

    public void testJobKill() throws Exception {
        LOG.info((Object)"Testing job-kill");
        MyListener myListener = new MyListener();
        MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
        JobConf job = mr.createJobConf();
        mr.getJobTrackerRunner().getJobTracker().addJobInProgressListener((JobInProgressListener)myListener);
        Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/input");
        Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerkilljob/output");
        RunningJob rJob = UtilsForTests.runJobKill(job, inDir, outDir);
        JobID id = rJob.getID();
        TestJobInProgressListener.assertFalse((String)"Missing event notification on killing a running job", (boolean)myListener.contains(id));
    }

    public void testJobSuccess() throws Exception {
        LOG.info((Object)"Testing job-success");
        MyListener myListener = new MyListener();
        MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1);
        JobConf job = mr.createJobConf();
        mr.getJobTrackerRunner().getJobTracker().addJobInProgressListener((JobInProgressListener)myListener);
        Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
        Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
        RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
        while (rJob.getJobState() != 1) {
            UtilsForTests.waitFor(10L);
        }
        LOG.info((Object)("Job " + rJob.getID().toString() + " started running"));
        TestJobInProgressListener.assertFalse((String)"Missing event notification for a running job", (boolean)myListener.contains(rJob.getID(), true));
        while (rJob.getJobState() != 2) {
            UtilsForTests.waitFor(10L);
        }
        TestJobInProgressListener.assertFalse((String)"Missing event notification for a successful job", (boolean)myListener.contains(rJob.getID(), false));
    }

    public void testQueuedJobKill() throws Exception {
        LOG.info((Object)"Testing queued-job-kill");
        MyListener myListener = new MyListener();
        JobConf job = new JobConf();
        job.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class, TaskScheduler.class);
        MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, job);
        job = mr.createJobConf();
        mr.getJobTrackerRunner().getJobTracker().addJobInProgressListener((JobInProgressListener)myListener);
        Path inDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/input");
        Path outDir = new Path(TEST_ROOT_DIR + "/jiplistenerjob/output");
        RunningJob rJob = UtilsForTests.runJob(job, inDir, outDir);
        JobID id = rJob.getID();
        LOG.info((Object)("Job : " + id.toString() + " submitted"));
        TestJobInProgressListener.assertTrue((String)"Missing event notification on submiting a job", (boolean)myListener.contains(id, true));
        LOG.info((Object)("Killing job : " + id.toString()));
        rJob.killJob();
        TestJobInProgressListener.assertEquals((String)"Job status doesnt reflect the kill-job action", (int)5, (int)rJob.getJobState());
        TestJobInProgressListener.assertFalse((String)"Missing event notification on killing a waiting job", (boolean)myListener.contains(id, true));
    }

    public static class MyScheduler
    extends JobQueueTaskScheduler {
        public synchronized void start() throws IOException {
            super.start();
            this.taskTrackerManager.removeJobInProgressListener((JobInProgressListener)this.eagerTaskInitializationListener);
            this.eagerTaskInitializationListener.terminate();
        }
    }

    public static class MyListener
    extends JobInProgressListener {
        private List<JobInProgress> wjobs = new ArrayList<JobInProgress>();
        private List<JobInProgress> jobs = new ArrayList<JobInProgress>();

        public boolean contains(JobID id) {
            return this.contains(id, true) || this.contains(id, false);
        }

        public boolean contains(JobID id, boolean waiting) {
            List<JobInProgress> queue = waiting ? this.wjobs : this.jobs;
            for (JobInProgress job : queue) {
                if (!job.getJobID().equals((Object)id)) continue;
                return true;
            }
            return false;
        }

        public void jobAdded(JobInProgress job) {
            LOG.info((Object)("Job " + job.getJobID().toString() + " added"));
            this.wjobs.add(job);
        }

        public void jobRemoved(JobInProgress job) {
            LOG.info((Object)("Job " + job.getJobID().toString() + " removed"));
        }

        public void jobUpdated(JobChangeEvent event) {
            JobStatusChangeEvent statusEvent;
            LOG.info((Object)("Job " + event.getJobInProgress().getJobID().toString() + " updated"));
            if (event instanceof JobStatusChangeEvent && (statusEvent = (JobStatusChangeEvent)event).getEventType() == JobStatusChangeEvent.EventType.RUN_STATE_CHANGED) {
                JobInProgress jip = event.getJobInProgress();
                String jobId = jip.getJobID().toString();
                if (jip.isComplete()) {
                    LOG.info((Object)("Job " + jobId + " deleted from the running queue"));
                    if (statusEvent.getOldStatus().getRunState() == 4) {
                        this.wjobs.remove(jip);
                    } else {
                        this.jobs.remove(jip);
                    }
                } else {
                    LOG.info((Object)("Job " + jobId + " deleted from the waiting queue"));
                    this.wjobs.remove(jip);
                    this.jobs.add(jip);
                }
            }
        }
    }
}

