/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskMemoryManagerThread;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.TestProcfsBasedProcessTree;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestTaskTrackerMemoryManager
extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class);
    private static String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "/tmp")).toString().replace(' ', '+');
    private MiniMRCluster miniMRCluster;
    private String taskOverLimitPatternString = "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond memory-limits. Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";

    private void startCluster(JobConf conf) throws Exception {
        conf.set("mapred.job.tracker.handler.count", "1");
        conf.set("mapred.tasktracker.map.tasks.maximum", "1");
        conf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
        conf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
        this.miniMRCluster = new MiniMRCluster(1, "file:///", 1, null, null, conf);
    }

    protected void tearDown() {
        if (this.miniMRCluster != null) {
            this.miniMRCluster.shutdown();
        }
    }

    private void runSleepJob(JobConf conf) throws Exception {
        String[] args = new String[]{"-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000"};
        ToolRunner.run((Configuration)conf, (Tool)new SleepJob(), (String[])args);
    }

    private void runAndCheckSuccessfulJob(JobConf conf) throws IOException {
        TaskCompletionEvent[] taskComplEvents;
        Pattern taskOverLimitPattern = Pattern.compile(String.format(this.taskOverLimitPatternString, "[0-9]*"));
        Matcher mat = null;
        boolean success = true;
        try {
            this.runSleepJob(conf);
            success = true;
        }
        catch (Exception e) {
            success = false;
        }
        TestTaskTrackerMemoryManager.assertTrue((boolean)success);
        JobClient jClient = new JobClient(conf);
        JobStatus[] jStatus = jClient.getAllJobs();
        JobStatus js = jStatus[0];
        RunningJob rj = jClient.getJob(js.getJobID());
        for (TaskCompletionEvent tce : taskComplEvents = rj.getTaskCompletionEvents(0)) {
            String[] diagnostics = rj.getTaskDiagnostics(tce.getTaskAttemptId());
            if (diagnostics == null) continue;
            for (String str : diagnostics) {
                mat = taskOverLimitPattern.matcher(str);
                TestTaskTrackerMemoryManager.assertFalse((boolean)mat.find());
            }
        }
    }

    private boolean isProcfsBasedTreeAvailable() {
        try {
            if (!ProcfsBasedProcessTree.isAvailable()) {
                LOG.info((Object)"Currently ProcessTree has only one implementation ProcfsBasedProcessTree, which is not available on this system. Not testing");
                return false;
            }
        }
        catch (Exception e) {
            LOG.info((Object)StringUtils.stringifyException((Throwable)e));
            return false;
        }
        return true;
    }

    public void testTTLimitsDisabled() throws Exception {
        if (!this.isProcfsBasedTreeAvailable()) {
            return;
        }
        this.startCluster(new JobConf());
        long PER_TASK_LIMIT = 1L;
        JobConf conf = this.miniMRCluster.createJobConf();
        conf.setMemoryForMapTask(PER_TASK_LIMIT);
        conf.setMemoryForReduceTask(PER_TASK_LIMIT);
        this.runAndCheckSuccessfulJob(conf);
    }

    public void testTasksWithinLimits() throws Exception {
        if (!this.isProcfsBasedTreeAvailable()) {
            return;
        }
        long PER_TASK_LIMIT = 2048L;
        JobConf fConf = new JobConf();
        fConf.setLong("mapred.cluster.map.memory.mb", 2048L);
        fConf.setLong("mapred.cluster.reduce.memory.mb", 2048L);
        this.startCluster(new JobConf());
        JobConf conf = new JobConf((Configuration)this.miniMRCluster.createJobConf());
        conf.setMemoryForMapTask(PER_TASK_LIMIT);
        conf.setMemoryForReduceTask(PER_TASK_LIMIT);
        this.runAndCheckSuccessfulJob(conf);
    }

    public void testTasksBeyondLimits() throws Exception {
        if (!this.isProcfsBasedTreeAvailable()) {
            return;
        }
        JobConf fConf = new JobConf();
        fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
        fConf.setLong("mapred.cluster.map.memory.mb", 2048L);
        fConf.setLong("mapred.cluster.reduce.memory.mb", 2048L);
        this.startCluster(fConf);
        this.runJobExceedingMemoryLimit();
    }

    public void testTaskMemoryMonitoringWithDeprecatedConfiguration() throws Exception {
        if (!this.isProcfsBasedTreeAvailable()) {
            return;
        }
        JobConf fConf = new JobConf();
        fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
        fConf.setLong("mapred.task.default.maxvmem", 0x80000000L);
        fConf.setLong("mapred.task.limit.maxvmem", 0xC0000000L);
        this.startCluster(fConf);
        this.runJobExceedingMemoryLimit();
    }

    private void runJobExceedingMemoryLimit() throws IOException {
        TaskCompletionEvent[] taskComplEvents;
        long PER_TASK_LIMIT = 1L;
        Pattern taskOverLimitPattern = Pattern.compile(String.format(this.taskOverLimitPatternString, String.valueOf(PER_TASK_LIMIT * 1024L * 1024L)));
        Matcher mat = null;
        JobConf conf = new JobConf((Configuration)this.miniMRCluster.createJobConf());
        conf.setMemoryForMapTask(PER_TASK_LIMIT);
        conf.setMemoryForReduceTask(PER_TASK_LIMIT);
        conf.setMaxMapAttempts(1);
        conf.setMaxReduceAttempts(1);
        boolean success = true;
        try {
            this.runSleepJob(conf);
            success = true;
        }
        catch (Exception e) {
            success = false;
        }
        TestTaskTrackerMemoryManager.assertFalse((boolean)success);
        JobClient jClient = new JobClient(conf);
        JobStatus[] jStatus = jClient.getAllJobs();
        JobStatus js = jStatus[0];
        RunningJob rj = jClient.getJob(js.getJobID());
        for (TaskCompletionEvent tce : taskComplEvents = rj.getTaskCompletionEvents(0)) {
            assert (tce.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED || tce.getTaskStatus() == TaskCompletionEvent.Status.FAILED);
            String[] diagnostics = rj.getTaskDiagnostics(tce.getTaskAttemptId());
            assert (diagnostics != null);
            for (String str : diagnostics) {
                mat = taskOverLimitPattern.matcher(str);
                TestTaskTrackerMemoryManager.assertTrue((boolean)mat.find());
            }
        }
    }

    public void testTasksCumulativelyExceedingTTLimits() throws Exception {
        if (!this.isProcfsBasedTreeAvailable()) {
            return;
        }
        long PER_TASK_LIMIT = 102400L;
        JobConf fConf = new JobConf();
        fConf.setLong("mapred.cluster.map.memory.mb", 1L);
        fConf.setLong("mapred.cluster.reduce.memory.mb", 1L);
        long TASK_TRACKER_LIMIT = 0x200000L;
        fConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
        this.startCluster(fConf);
        Pattern taskOverLimitPattern = Pattern.compile(String.format(this.taskOverLimitPatternString, String.valueOf(PER_TASK_LIMIT)));
        Pattern trackerOverLimitPattern = Pattern.compile("Killing one of the least progress tasks - .*, as the cumulative memory usage of all the tasks on the TaskTracker host0.foo.com exceeds virtual memory limit " + TASK_TRACKER_LIMIT + ".");
        Matcher mat = null;
        JobConf conf = new JobConf((Configuration)this.miniMRCluster.createJobConf());
        conf.setMemoryForMapTask(PER_TASK_LIMIT);
        conf.setMemoryForReduceTask(PER_TASK_LIMIT);
        JobClient jClient = new JobClient(conf);
        SleepJob sleepJob = new SleepJob();
        sleepJob.setConf((Configuration)conf);
        RunningJob job = jClient.submitJob(sleepJob.setupJobConf(1, 1, 5000L, 1, 1000L, 1));
        boolean TTOverFlowMsgPresent = false;
        while (true) {
            ArrayList<TaskReport> allTaskReports = new ArrayList<TaskReport>();
            allTaskReports.addAll(Arrays.asList(jClient.getSetupTaskReports(job.getID())));
            allTaskReports.addAll(Arrays.asList(jClient.getMapTaskReports(job.getID())));
            for (TaskReport tr : allTaskReports) {
                String[] diag;
                for (String str : diag = tr.getDiagnostics()) {
                    mat = taskOverLimitPattern.matcher(str);
                    TestTaskTrackerMemoryManager.assertFalse((boolean)mat.find());
                    mat = trackerOverLimitPattern.matcher(str);
                    if (!mat.find()) continue;
                    TTOverFlowMsgPresent = true;
                }
            }
            if (TTOverFlowMsgPresent) break;
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {}
        }
        job.killJob();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testProcessTreeLimits() throws IOException {
        File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
        String[] pids = new String[]{"100", "200", "300", "400", "500", "600", "700"};
        try {
            TestProcfsBasedProcessTree.setupProcfsRootDir(procfsRootDir);
            TestProcfsBasedProcessTree.setupPidDirs(procfsRootDir, pids);
            TestProcfsBasedProcessTree.ProcessStatInfo[] procs = new TestProcfsBasedProcessTree.ProcessStatInfo[]{new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"100", "proc1", "1", "100", "100", "100000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"200", "proc2", "1", "200", "200", "200000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"300", "proc3", "200", "200", "200", "300000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"400", "proc4", "200", "200", "200", "400000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"500", "proc5", "100", "100", "100", "1500000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"600", "proc6", "1", "600", "600", "100000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"700", "proc7", "600", "600", "600", "100000"})};
            TestProcfsBasedProcessTree.writeStatFiles(procfsRootDir, pids, procs);
            long limit = 700000L;
            TaskMemoryManagerThread test = new TaskMemoryManagerThread(1000000L, 5000L);
            ProcfsBasedProcessTree pTree = new ProcfsBasedProcessTree("100", procfsRootDir.getAbsolutePath());
            pTree.getProcessTree();
            TestTaskTrackerMemoryManager.assertTrue((String)"tree rooted at 100 should be over limit after first iteration.", (boolean)test.isProcessTreeOverLimit(pTree, "dummyId", limit));
            pTree = new ProcfsBasedProcessTree("200", procfsRootDir.getAbsolutePath());
            pTree.getProcessTree();
            TestTaskTrackerMemoryManager.assertFalse((String)"tree rooted at 200 shouldn't be over limit after one iteration.", (boolean)test.isProcessTreeOverLimit(pTree, "dummyId", limit));
            pTree.getProcessTree();
            TestTaskTrackerMemoryManager.assertTrue((String)"tree rooted at 200 should be over limit after 2 iterations", (boolean)test.isProcessTreeOverLimit(pTree, "dummyId", limit));
            pTree = new ProcfsBasedProcessTree("600", procfsRootDir.getAbsolutePath());
            pTree.getProcessTree();
            TestTaskTrackerMemoryManager.assertFalse((String)"tree rooted at 600 should never be over limit.", (boolean)test.isProcessTreeOverLimit(pTree, "dummyId", limit));
            pTree.getProcessTree();
            TestTaskTrackerMemoryManager.assertFalse((String)"tree rooted at 600 should never be over limit.", (boolean)test.isProcessTreeOverLimit(pTree, "dummyId", limit));
        }
        finally {
            FileUtil.fullyDelete((File)procfsRootDir);
        }
    }
}

