/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.mapred.tools.MRAdmin;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestJobTrackerQuiescence {
    final Path testDir = new Path(System.getProperty("test.build.data", "/tmp"), "jt-safemode");
    final Path inDir = new Path(this.testDir, "input");
    final Path shareDir = new Path(this.testDir, "share");
    final Path outputDir = new Path(this.testDir, "output");
    final int maxMapTasks = 1;
    private MiniDFSCluster dfs;
    private MiniMRCluster mr;
    private FileSystem fileSys;
    private JobTracker jt;
    private static final Log LOG = LogFactory.getLog(TestJobTrackerQuiescence.class);

    @Before
    public void setUp() throws IOException {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.replication.considerLoad", false);
        this.dfs = new MiniDFSCluster(conf, 1, true, null, null);
        this.dfs.waitActive();
        this.fileSys = this.dfs.getFileSystem();
        this.fileSys.delete(this.testDir, true);
        if (!this.fileSys.mkdirs(this.inDir)) {
            throw new IOException("Mkdirs failed to create " + this.inDir.toString());
        }
        UtilsForTests.writeFile(this.dfs.getNameNode(), conf, new Path(this.inDir + "/file"), (short)1);
        this.dfs.startDataNodes(conf, 1, true, null, null, null, null);
        this.dfs.waitActive();
        String namenode = this.dfs.getFileSystem().getUri().getHost() + ":" + this.dfs.getFileSystem().getUri().getPort();
        JobConf jtConf = new JobConf();
        jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
        jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
        jtConf.setBoolean("mapreduce.jt.hdfs.monitor.enable", true);
        jtConf.setInt("mapreduce.jt.hdfs.monitor.interval.ms", 1000);
        this.mr = new MiniMRCluster(1, namenode, 1, null, null, jtConf);
        this.mr.waitUntilIdle();
        this.mr.setInlineCleanupThreads();
        this.jt = this.mr.getJobTrackerRunner().getJobTracker();
    }

    @After
    public void tearDown() {
        if (this.mr != null) {
            try {
                this.mr.shutdown();
            }
            catch (Exception e) {
                // empty catch block
            }
        }
        if (this.dfs != null) {
            try {
                this.dfs.shutdown();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    @Test
    public void testHDFSMonitor() throws Exception {
        int numTries;
        this.dfs.getNameNode().setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
        for (numTries = 20; !this.jt.isInSafeMode() && numTries > 0; --numTries) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((boolean)true, (boolean)this.jt.isInSafeMode());
        this.dfs.getNameNode().setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
        for (numTries = 20; this.jt.isInSafeMode() && numTries > 0; --numTries) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((boolean)false, (boolean)this.jt.isInSafeMode());
        this.dfs.getNameNode().setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
        for (numTries = 20; !this.jt.isInSafeMode() && numTries > 0; --numTries) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((boolean)true, (boolean)this.jt.isInSafeMode());
        this.enterSafeMode();
        this.dfs.getNameNode().setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
        for (numTries = 20; this.jt.isInSafeMode() && numTries > 0; --numTries) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((boolean)true, (boolean)this.jt.isInSafeMode());
        Assert.assertEquals((boolean)true, (boolean)this.jt.isInAdminSafeMode());
        this.leaveSafeMode();
        Assert.assertEquals((boolean)false, (boolean)this.jt.isInAdminSafeMode());
        this.dfs.getNameNode().setSafeMode(FSConstants.SafeModeAction.SAFEMODE_ENTER);
        Thread.sleep(5000L);
        this.dfs.getNameNode().setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
        for (numTries = 20; this.jt.isInSafeMode() && numTries > 0; --numTries) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((boolean)false, (boolean)this.jt.isInSafeMode());
    }

    @Test
    public void testMRAdminSafeModeWait() throws Exception {
        this.enterSafeMode();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Void> future = executor.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                MRAdmin mrAdmin = new MRAdmin((Configuration)TestJobTrackerQuiescence.this.mr.createJobConf());
                mrAdmin.run(new String[]{"-safemode", "wait"});
                return null;
            }
        });
        try {
            future.get(1L, TimeUnit.SECONDS);
            Assert.fail((String)"JT should still be in safemode");
        }
        catch (TimeoutException e) {
            // empty catch block
        }
        this.leaveSafeMode();
        try {
            future.get(10L, TimeUnit.SECONDS);
        }
        catch (TimeoutException e) {
            Assert.fail((String)"JT should no longer be in safemode");
        }
    }

    @Test
    public void testJobsPauseInSafeMode() throws Exception {
        FileSystem fileSys = this.dfs.getFileSystem();
        JobConf jobConf = this.mr.createJobConf();
        int numMaps = 10;
        int numReds = 1;
        String mapSignalFile = UtilsForTests.getMapSignalFile(this.shareDir);
        String redSignalFile = UtilsForTests.getReduceSignalFile(this.shareDir);
        jobConf.set("user.name", UserGroupInformation.getCurrentUser().getUserName());
        JobConf job = this.configureJob(jobConf, numMaps, numReds, mapSignalFile, redSignalFile);
        fileSys.delete(this.shareDir, true);
        JobClient jobClient = new JobClient(job);
        RunningJob rJob = jobClient.submitJob(job);
        JobID id = rJob.getID();
        this.mr.initializeJob(id);
        while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() < 0.5f) {
            UtilsForTests.waitFor(10L);
        }
        Assert.assertEquals((int)(numMaps / 2), (int)this.getCompletedMapCount(rJob));
        this.enterSafeMode();
        UtilsForTests.signalTasks(this.dfs, fileSys, true, mapSignalFile, redSignalFile);
        UtilsForTests.signalTasks(this.dfs, fileSys, false, mapSignalFile, redSignalFile);
        Thread.sleep(10000L);
        Assert.assertEquals((int)(numMaps / 2 + 1), (int)this.getCompletedMapCount(rJob));
        this.leaveSafeMode();
        UtilsForTests.waitTillDone(jobClient);
        Assert.assertTrue((boolean)rJob.isSuccessful());
    }

    private int getCompletedMapCount(RunningJob rJob) throws IOException {
        TaskCompletionEvent[] taskCompletionEvents = rJob.getTaskCompletionEvents(0);
        int mapCount = 0;
        for (TaskCompletionEvent tce : taskCompletionEvents) {
            if (!tce.isMap) continue;
            ++mapCount;
        }
        return mapCount;
    }

    private JobConf configureJob(JobConf conf, int maps, int reduces, String mapSignal, String redSignal) throws IOException {
        UtilsForTests.configureWaitingJobConf(conf, this.inDir, this.outputDir, maps, reduces, "test-jt-safemode", mapSignal, redSignal);
        return conf;
    }

    private void enterSafeMode() throws IOException {
        this.jt.setSafeMode(JobTracker.SafeModeAction.SAFEMODE_ENTER);
    }

    private void leaveSafeMode() throws IOException {
        this.jt.setSafeMode(JobTracker.SafeModeAction.SAFEMODE_LEAVE);
    }
}

