/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import javax.net.SocketFactory;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobSubmissionProtocol;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TestMiniMRWithDFSWithDistinctUsers;
import org.apache.hadoop.mapred.UtilsForTests;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TestSubmitJob
extends TestCase {
    static final Log LOG = LogFactory.getLog(TestSubmitJob.class);
    private MiniMRCluster mrCluster;
    private MiniDFSCluster dfsCluster;
    private JobTracker jt;
    private FileSystem fs;
    private static Path TEST_DIR = new Path(System.getProperty("test.build.data", "/tmp"), "job-submission-testing");
    private static int numSlaves = 1;

    private void startCluster() throws Exception {
        super.setUp();
        Configuration conf = new Configuration();
        this.dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
        JobConf jConf = new JobConf(conf);
        jConf.setLong("mapred.job.submission.expiry.interval", 6000L);
        this.mrCluster = new MiniMRCluster(0, 0, numSlaves, this.dfsCluster.getFileSystem().getUri().toString(), 1, null, null, null, jConf);
        this.jt = this.mrCluster.getJobTrackerRunner().getJobTracker();
        this.fs = FileSystem.get((Configuration)this.mrCluster.createJobConf());
    }

    private void stopCluster() throws Exception {
        this.mrCluster.shutdown();
        this.mrCluster = null;
        this.dfsCluster.shutdown();
        this.dfsCluster = null;
        this.jt = null;
        this.fs = null;
    }

    public void testJobWithInvalidMemoryReqs() throws Exception {
        JobConf jtConf = new JobConf();
        jtConf.setLong("mapred.cluster.map.memory.mb", 1024L);
        jtConf.setLong("mapred.cluster.reduce.memory.mb", 2048L);
        jtConf.setLong("mapred.cluster.max.map.memory.mb", 3072L);
        jtConf.setLong("mapred.cluster.max.reduce.memory.mb", 4096L);
        this.mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
        JobConf clusterConf = this.mrCluster.createJobConf();
        JobConf jobConf = new JobConf((Configuration)clusterConf);
        jobConf.setMemoryForReduceTask(1024L);
        this.runJobAndVerifyFailure(jobConf, -1L, 1024L, "Invalid job requirements.");
        jobConf = new JobConf((Configuration)clusterConf);
        jobConf.setMemoryForMapTask(1024L);
        this.runJobAndVerifyFailure(jobConf, 1024L, -1L, "Invalid job requirements.");
        jobConf = new JobConf((Configuration)clusterConf);
        jobConf.setMemoryForMapTask(4096L);
        jobConf.setMemoryForReduceTask(1024L);
        this.runJobAndVerifyFailure(jobConf, 4096L, 1024L, "Exceeds the cluster's max-memory-limit.");
        jobConf = new JobConf((Configuration)clusterConf);
        jobConf.setMemoryForMapTask(1024L);
        jobConf.setMemoryForReduceTask(5120L);
        this.runJobAndVerifyFailure(jobConf, 1024L, 5120L, "Exceeds the cluster's max-memory-limit.");
        this.mrCluster.shutdown();
        this.mrCluster = null;
    }

    public void testJobWithInvalidDiskReqs() throws Exception {
        JobConf jtConf = new JobConf();
        jtConf.setLong("mapred.user.jobconf.limit", 1024L);
        this.mrCluster = new MiniMRCluster(0, "file:///", 0, null, null, jtConf);
        JobConf clusterConf = this.mrCluster.createJobConf();
        JobConf jobConf = new JobConf((Configuration)clusterConf);
        String[] args = new String[]{"-m", "0", "-r", "0", "-mt", "0", "-rt", "0"};
        Object msg = null;
        try {
            ToolRunner.run((Configuration)jobConf, (Tool)new SleepJob(), (String[])args);
            TestSubmitJob.assertTrue((boolean)false);
        }
        catch (RemoteException re) {
            System.out.println("Exception " + StringUtils.stringifyException((Throwable)re));
        }
        this.mrCluster.shutdown();
        this.mrCluster = null;
    }

    private void runJobAndVerifyFailure(JobConf jobConf, long memForMapTasks, long memForReduceTasks, String expectedMsg) throws Exception, IOException {
        String[] args = new String[]{"-m", "0", "-r", "0", "-mt", "0", "-rt", "0"};
        boolean throwsException = false;
        String msg = null;
        try {
            ToolRunner.run((Configuration)jobConf, (Tool)new SleepJob(), (String[])args);
        }
        catch (RemoteException re) {
            throwsException = true;
            msg = re.unwrapRemoteException().getMessage();
        }
        TestSubmitJob.assertTrue((boolean)throwsException);
        TestSubmitJob.assertNotNull((Object)msg);
        String overallExpectedMsg = "(" + memForMapTasks + " memForMapTasks " + memForReduceTasks + " memForReduceTasks): " + expectedMsg;
        TestSubmitJob.assertTrue((String)("Observed message - " + msg + " - doesn't contain expected message - " + overallExpectedMsg), (boolean)msg.contains(overallExpectedMsg));
    }

    static JobSubmissionProtocol getJobSubmitClient(JobConf conf, UserGroupInformation ugi) throws IOException {
        return (JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class, (long)28L, (InetSocketAddress)JobTracker.getAddress((Configuration)conf), (UserGroupInformation)ugi, (Configuration)conf, (SocketFactory)NetUtils.getSocketFactory((Configuration)conf, JobSubmissionProtocol.class));
    }

    static ClientProtocol getDFSClient(Configuration conf, UserGroupInformation ugi) throws IOException {
        return (ClientProtocol)RPC.getProxy(ClientProtocol.class, (long)63L, (InetSocketAddress)NameNode.getAddress((Configuration)conf), (UserGroupInformation)ugi, (Configuration)conf, (SocketFactory)NetUtils.getSocketFactory((Configuration)conf, ClientProtocol.class));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSecureJobExecution() throws Exception {
        LOG.info((Object)"Testing secure job submission/execution");
        MiniMRCluster mr = null;
        Configuration conf = new Configuration();
        final MiniDFSCluster dfs = new MiniDFSCluster(conf, 1, true, null);
        try {
            FileSystem fs = (FileSystem)TestMiniMRWithDFSWithDistinctUsers.DFS_UGI.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<FileSystem>(){

                @Override
                public FileSystem run() throws IOException {
                    return dfs.getFileSystem();
                }
            });
            TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/user");
            TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, "/mapred");
            TestMiniMRWithDFSWithDistinctUsers.mkdir(fs, conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging"));
            UserGroupInformation MR_UGI = UserGroupInformation.getLoginUser();
            mr = new MiniMRCluster(0, 0, 1, dfs.getFileSystem().getUri().toString(), 1, null, null, MR_UGI);
            JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
            String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
            dfs.getFileSystem().delete(TEST_DIR, true);
            Path mapSignalFile = new Path(TEST_DIR, "map-signal");
            Path reduceSignalFile = new Path(TEST_DIR, "reduce-signal");
            UserGroupInformation user1 = TestMiniMRWithDFSWithDistinctUsers.createUGI("user1", false);
            Path inDir = new Path("/user/input");
            Path outDir = new Path("/user/output");
            final JobConf job = mr.createJobConf();
            UtilsForTests.configureWaitingJobConf(job, inDir, outDir, 2, 0, "test-submit-job", mapSignalFile.toString(), reduceSignalFile.toString());
            job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
            job.set(UtilsForTests.getTaskSignalParameter(false), reduceSignalFile.toString());
            LOG.info((Object)("Submit job as the actual user (" + user1.getUserName() + ")"));
            final JobClient jClient = (JobClient)user1.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<JobClient>(){

                @Override
                public JobClient run() throws IOException {
                    return new JobClient(job);
                }
            });
            RunningJob rJob = (RunningJob)user1.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<RunningJob>(){

                @Override
                public RunningJob run() throws IOException {
                    return jClient.submitJob(job);
                }
            });
            JobID id = rJob.getID();
            LOG.info((Object)("Running job " + id));
            UserGroupInformation user2 = TestMiniMRWithDFSWithDistinctUsers.createUGI("user2", false);
            JobConf conf_other = mr.createJobConf();
            ClientProtocol client = TestSubmitJob.getDFSClient((Configuration)conf_other, user2);
            try {
                String path = new URI(jt.getSystemDir()).getPath();
                LOG.info((Object)("Try listing the mapred-system-dir as the user (" + user2.getUserName() + ")"));
                client.getListing(path, HdfsFileStatus.EMPTY_NAME);
                TestSubmitJob.fail((String)"JobTracker system dir is accessible to others");
            }
            catch (IOException ioe) {
                TestSubmitJob.assertTrue((String)ioe.toString(), (boolean)ioe.toString().contains("Permission denied"));
            }
            JobInProgress jip = jt.getJob(id);
            Path jobSubmitDirpath = new Path(jip.getJobConf().get("mapreduce.job.dir"));
            try {
                LOG.info((Object)("Try accessing the job folder for job " + id + " as the user (" + user2.getUserName() + ")"));
                client.getListing(jobSubmitDirpath.toUri().getPath(), HdfsFileStatus.EMPTY_NAME);
                TestSubmitJob.fail((String)"User's staging folder is accessible to others");
            }
            catch (IOException ioe) {
                TestSubmitJob.assertTrue((String)ioe.toString(), (boolean)ioe.toString().contains("Permission denied"));
            }
            UtilsForTests.signalTasks(dfs, fs, true, mapSignalFile.toString(), reduceSignalFile.toString());
            UtilsForTests.waitTillDone(jClient);
            LOG.info((Object)"Check if job submit dir is cleanup or not");
            TestSubmitJob.assertFalse((boolean)fs.exists(jobSubmitDirpath));
        }
        finally {
            if (mr != null) {
                mr.shutdown();
            }
            if (dfs != null) {
                dfs.shutdown();
            }
        }
    }
}

