package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.util.LinuxResourceCalculatorPlugin;
import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
import org.apache.hadoop.mapreduce.util.TestProcfsBasedProcessTree;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;

/* loaded from: input_file:org/apache/hadoop/mapred/TestTaskTrackerMemoryManager.class */
public class TestTaskTrackerMemoryManager extends TestCase {
    private static final Log LOG = LogFactory.getLog(TestTaskTrackerMemoryManager.class);
    private static String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data", "/tmp")).toString().replace(' ', '+');
    private MiniMRCluster miniMRCluster;
    private String taskOverLimitPatternString = "TaskTree \\[pid=[0-9]*,tipID=.*\\] is running beyond.*memory-limits. Current usage : [0-9]*bytes. Limit : %sbytes. Killing task.";

    private void startCluster(JobConf jobConf) throws Exception {
        jobConf.set("mapred.job.tracker.handler.count", "1");
        jobConf.set("mapred.tasktracker.map.tasks.maximum", "1");
        jobConf.set("mapred.tasktracker.reduce.tasks.maximum", "1");
        jobConf.set("mapred.tasktracker.tasks.sleeptime-before-sigkill", "0");
        this.miniMRCluster = new MiniMRCluster(1, "file:///", 1, (String[]) null, (String[]) null, jobConf);
    }

    protected void tearDown() {
        if (this.miniMRCluster != null) {
            this.miniMRCluster.shutdown();
        }
    }

    private void runSleepJob(JobConf jobConf) throws Exception {
        ToolRunner.run(jobConf, new SleepJob(), new String[]{"-m", "3", "-r", "1", "-mt", "3000", "-rt", "1000"});
    }

    private void runAndCheckSuccessfulJob(JobConf jobConf) throws IOException {
        boolean z;
        Pattern compile = Pattern.compile(String.format(this.taskOverLimitPatternString, "[0-9]*"));
        try {
            runSleepJob(jobConf);
            z = true;
        } catch (Exception e) {
            z = false;
        }
        assertTrue(z);
        JobClient jobClient = new JobClient(jobConf);
        RunningJob job = jobClient.getJob(jobClient.getAllJobs()[0].getJobID());
        for (TaskCompletionEvent taskCompletionEvent : job.getTaskCompletionEvents(0)) {
            String[] taskDiagnostics = job.getTaskDiagnostics(taskCompletionEvent.getTaskAttemptId());
            if (taskDiagnostics != null) {
                for (String str : taskDiagnostics) {
                    assertFalse(compile.matcher(str).find());
                }
            }
        }
    }

    private boolean isProcfsBasedTreeAvailable() {
        try {
            if (ProcfsBasedProcessTree.isAvailable()) {
                return true;
            }
            LOG.info("Currently ProcessTree has only one implementation ProcfsBasedProcessTree, which is not available on this system. Not testing");
            return false;
        } catch (Exception e) {
            LOG.info(StringUtils.stringifyException(e));
            return false;
        }
    }

    public void testTTLimitsDisabled() throws Exception {
        if (isProcfsBasedTreeAvailable()) {
            startCluster(new JobConf());
            JobConf createJobConf = this.miniMRCluster.createJobConf();
            createJobConf.setMemoryForMapTask(1L);
            createJobConf.setMemoryForReduceTask(1L);
            runAndCheckSuccessfulJob(createJobConf);
        }
    }

    public void testTasksWithinLimits() throws Exception {
        if (isProcfsBasedTreeAvailable()) {
            JobConf jobConf = new JobConf();
            jobConf.setLong("mapred.cluster.map.memory.mb", 2048L);
            jobConf.setLong("mapred.cluster.reduce.memory.mb", 2048L);
            jobConf.setLong("mapreduce.tasktracker.reserved.physicalmemory.mb", 1L);
            startCluster(new JobConf());
            JobConf jobConf2 = new JobConf(this.miniMRCluster.createJobConf());
            jobConf2.setMemoryForMapTask(2048L);
            jobConf2.setMemoryForReduceTask(2048L);
            jobConf2.setLong("mapred.job.map.memory.physical.mb", 2048L);
            jobConf2.setLong("mapred.job.reduce.memory.mb", 2048L);
            runAndCheckSuccessfulJob(jobConf2);
        }
    }

    public void testTasksBeyondLimits() throws Exception {
        if (isProcfsBasedTreeAvailable()) {
            JobConf jobConf = new JobConf();
            jobConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
            jobConf.setLong("mapred.cluster.map.memory.mb", 2048L);
            jobConf.setLong("mapred.cluster.reduce.memory.mb", 2048L);
            startCluster(jobConf);
            runJobExceedingMemoryLimit(false);
        }
    }

    public void testTasksBeyondPhysicalLimits() throws Exception {
        if (isProcfsBasedTreeAvailable()) {
            JobConf jobConf = new JobConf();
            jobConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
            jobConf.setLong("mapreduce.tasktracker.reserved.physicalmemory.mb", 1L);
            startCluster(jobConf);
            runJobExceedingMemoryLimit(true);
        }
    }

    public void testTaskMemoryMonitoringWithDeprecatedConfiguration() throws Exception {
        if (isProcfsBasedTreeAvailable()) {
            JobConf jobConf = new JobConf();
            jobConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
            jobConf.setLong("mapred.task.default.maxvmem", 2147483648L);
            jobConf.setLong("mapred.task.limit.maxvmem", 3221225472L);
            startCluster(jobConf);
            runJobExceedingMemoryLimit(false);
        }
    }

    private void runJobExceedingMemoryLimit(boolean z) throws IOException {
        boolean z2;
        Pattern compile = Pattern.compile(String.format(this.taskOverLimitPatternString, String.valueOf(1 * 1024 * 1024)));
        JobConf jobConf = new JobConf(this.miniMRCluster.createJobConf());
        if (z) {
            jobConf.setLong("mapred.job.map.memory.physical.mb", 1L);
            jobConf.setLong("mapred.job.reduce.memory.mb", 1L);
        } else {
            jobConf.setMemoryForMapTask(1L);
            jobConf.setMemoryForReduceTask(1L);
        }
        jobConf.setMaxMapAttempts(1);
        jobConf.setMaxReduceAttempts(1);
        try {
            runSleepJob(jobConf);
            z2 = true;
        } catch (Exception e) {
            z2 = false;
        }
        assertFalse(z2);
        JobClient jobClient = new JobClient(jobConf);
        RunningJob job = jobClient.getJob(jobClient.getAllJobs()[0].getJobID());
        for (TaskCompletionEvent taskCompletionEvent : job.getTaskCompletionEvents(0)) {
            for (String str : job.getTaskDiagnostics(taskCompletionEvent.getTaskAttemptId())) {
                assertTrue(compile.matcher(str).find());
            }
        }
    }

    public void testTasksCumulativelyExceedingTTLimits() throws Exception {
        if (!isProcfsBasedTreeAvailable()) {
            return;
        }
        JobConf jobConf = new JobConf();
        jobConf.setLong("mapred.cluster.map.memory.mb", 1L);
        jobConf.setLong("mapred.cluster.reduce.memory.mb", 1L);
        jobConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
        startCluster(jobConf);
        Pattern compile = Pattern.compile(String.format(this.taskOverLimitPatternString, String.valueOf(102400L)));
        Pattern compile2 = Pattern.compile("Killing one of the least progress tasks - .*, as the cumulative memory usage of all the tasks on the TaskTracker .* exceeds virtual memory limit 2097152.");
        JobConf jobConf2 = new JobConf(this.miniMRCluster.createJobConf());
        jobConf2.setMemoryForMapTask(102400L);
        jobConf2.setMemoryForReduceTask(102400L);
        JobClient jobClient = new JobClient(jobConf2);
        SleepJob sleepJob = new SleepJob();
        sleepJob.setConf(jobConf2);
        RunningJob submitJob = jobClient.submitJob(sleepJob.setupJobConf(1, 1, 5000L, 1, 1000L, 1));
        boolean z = false;
        while (true) {
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(Arrays.asList(jobClient.getSetupTaskReports(submitJob.getID())));
            arrayList.addAll(Arrays.asList(jobClient.getMapTaskReports(submitJob.getID())));
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                for (String str : ((TaskReport) it.next()).getDiagnostics()) {
                    assertFalse(compile.matcher(str).find());
                    if (compile2.matcher(str).find()) {
                        z = true;
                    }
                }
            }
            if (z) {
                submitJob.killJob();
                return;
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    }

    public void testProcessTreeLimits() throws IOException {
        File file = new File(TEST_ROOT_DIR, "proc");
        String[] strArr = {"100", "200", "300", "400", "500", "600", "700"};
        try {
            TestProcfsBasedProcessTree.setupProcfsRootDir(file);
            TestProcfsBasedProcessTree.setupPidDirs(file, strArr);
            TestProcfsBasedProcessTree.writeStatFiles(file, strArr, new TestProcfsBasedProcessTree.ProcessStatInfo[]{new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"100", "proc1", "1", "100", "100", "100000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"200", "proc2", "1", "200", "200", "200000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"300", "proc3", "200", "200", "200", "300000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"400", "proc4", "200", "200", "200", "400000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"500", "proc5", "100", "100", "100", "1500000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"600", "proc6", "1", "600", "600", "100000"}), new TestProcfsBasedProcessTree.ProcessStatInfo(new String[]{"700", "proc7", "600", "600", "600", "100000"})});
            TaskMemoryManagerThread taskMemoryManagerThread = new TaskMemoryManagerThread(1000000L, 5000L);
            ProcfsBasedProcessTree procfsBasedProcessTree = new ProcfsBasedProcessTree("100", file.getAbsolutePath());
            procfsBasedProcessTree.getProcessTree();
            assertTrue("tree rooted at 100 should be over limit after first iteration.", taskMemoryManagerThread.isProcessTreeOverLimit(procfsBasedProcessTree, "dummyId", 700000L));
            ProcfsBasedProcessTree procfsBasedProcessTree2 = new ProcfsBasedProcessTree("200", file.getAbsolutePath());
            procfsBasedProcessTree2.getProcessTree();
            assertFalse("tree rooted at 200 shouldn't be over limit after one iteration.", taskMemoryManagerThread.isProcessTreeOverLimit(procfsBasedProcessTree2, "dummyId", 700000L));
            procfsBasedProcessTree2.getProcessTree();
            assertTrue("tree rooted at 200 should be over limit after 2 iterations", taskMemoryManagerThread.isProcessTreeOverLimit(procfsBasedProcessTree2, "dummyId", 700000L));
            ProcfsBasedProcessTree procfsBasedProcessTree3 = new ProcfsBasedProcessTree("600", file.getAbsolutePath());
            procfsBasedProcessTree3.getProcessTree();
            assertFalse("tree rooted at 600 should never be over limit.", taskMemoryManagerThread.isProcessTreeOverLimit(procfsBasedProcessTree3, "dummyId", 700000L));
            procfsBasedProcessTree3.getProcessTree();
            assertFalse("tree rooted at 600 should never be over limit.", taskMemoryManagerThread.isProcessTreeOverLimit(procfsBasedProcessTree3, "dummyId", 700000L));
            FileUtil.fullyDelete(file);
        } catch (Throwable th) {
            FileUtil.fullyDelete(file);
            throw th;
        }
    }

    public void testTasksCumulativelyExceedingTTPhysicalLimits() throws Exception {
        if (!isProcfsBasedTreeAvailable()) {
            return;
        }
        JobConf jobConf = new JobConf();
        jobConf.set("mapred.tasktracker.taskmemorymanager.monitoring-interval", String.valueOf(300));
        long physicalMemorySize = new LinuxResourceCalculatorPlugin().getPhysicalMemorySize();
        long j = (physicalMemorySize / 1048576) + 1;
        jobConf.setLong("mapreduce.tasktracker.reserved.physicalmemory.mb", j);
        Pattern compile = Pattern.compile("Killing one of the memory-consuming tasks - .*, as the cumulative RSS memory usage of all the tasks on the TaskTracker exceeds physical memory limit " + (physicalMemorySize - ((j * 1024) * 1024)) + ".");
        startCluster(jobConf);
        JobConf jobConf2 = new JobConf(this.miniMRCluster.createJobConf());
        jobConf2.setLong("mapred.job.map.memory.mb", 2048L);
        jobConf2.setLong("mapred.job.reduce.memory.mb", 2048L);
        JobClient jobClient = new JobClient(jobConf2);
        SleepJob sleepJob = new SleepJob();
        sleepJob.setConf(jobConf2);
        RunningJob submitJob = jobClient.submitJob(sleepJob.setupJobConf(1, 1, 5000L, 1, 1000L, 1));
        boolean z = false;
        while (true) {
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(Arrays.asList(jobClient.getSetupTaskReports(submitJob.getID())));
            arrayList.addAll(Arrays.asList(jobClient.getMapTaskReports(submitJob.getID())));
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                for (String str : ((TaskReport) it.next()).getDiagnostics()) {
                    if (compile.matcher(str).find()) {
                        z = true;
                    }
                }
            }
            if (z) {
                submitJob.killJob();
                return;
            } else {
                assertFalse("Job should not finish successfully", submitJob.isSuccessful());
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
    }
}
