/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import javax.tools.JavaCompiler;
import javax.tools.JavaFileObject;
import javax.tools.SimpleJavaFileObject;
import javax.tools.StandardJavaFileManager;
import javax.tools.StandardLocation;
import javax.tools.ToolProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.BroadcastAndOneToOneExample;
import org.apache.tez.mapreduce.examples.ExampleDriver;
import org.apache.tez.mapreduce.examples.MRRSleepJob;
import org.apache.tez.mapreduce.examples.UnionExample;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestMRRJobsDAGApi {
    private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
    protected static MiniTezCluster mrrTezCluster;
    protected static MiniDFSCluster dfsCluster;
    private static Configuration conf;
    private static FileSystem remoteFs;
    private Random random = new Random();
    private static String TEST_ROOT_DIR;
    private static String RELOCALIZATION_TEST_CLASS_NAME;

    @BeforeClass
    public static void setup() throws IOException {
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        if (mrrTezCluster == null) {
            mrrTezCluster = new MiniTezCluster(TestMRRJobsDAGApi.class.getName(), 1, 1, 1);
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", remoteFs.getUri().toString());
            conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
            mrrTezCluster.init(conf);
            mrrTezCluster.start();
        }
    }

    @AfterClass
    public static void tearDown() {
        if (mrrTezCluster != null) {
            mrrTezCluster.stop();
            mrrTezCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
            dfsCluster = null;
        }
    }

    @Test(timeout=60000L)
    public void testSleepJob() throws TezException, IOException, InterruptedException {
        SleepProcessor.SleepProcessorConfig spConf = new SleepProcessor.SleepProcessorConfig(1);
        DAG dag = DAG.create((String)"TezSleepProcessor");
        Vertex vertex = Vertex.create((String)"SleepVertex", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload())), (int)1, (Resource)Resource.newInstance((int)1024, (int)1));
        dag.addVertex(vertex);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        TezClient tezSession = TezClient.create((String)"TezSleepProcessor", (TezConfiguration)tezConf, (boolean)false);
        tezSession.start();
        DAGClient dagClient = tezSession.submitDAG(dag);
        DAGStatus dagStatus = dagClient.getDAGStatus(null);
        while (!dagStatus.isCompleted()) {
            LOG.info((Object)("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState()));
            Thread.sleep(500L);
            dagStatus = dagClient.getDAGStatus(null);
        }
        dagStatus = dagClient.getDAGStatus((Set)Sets.newHashSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
        Assert.assertNotNull((Object)dagStatus.getDAGCounters());
        Assert.assertNotNull((Object)dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
        Assert.assertNotNull((Object)dagStatus.getDAGCounters().findCounter((Enum)TaskCounter.GC_TIME_MILLIS));
        ExampleDriver.printDAGStatus((DAGClient)dagClient, (String[])new String[]{"SleepVertex"}, (boolean)true, (boolean)true);
        tezSession.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testMultipleDAGsWithDuplicateName() throws TezException, IOException, InterruptedException {
        TezClient tezSession = null;
        try {
            TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
            Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
            remoteFs.mkdirs(remoteStagingDir);
            tezConf.set("tez.staging-dir", remoteStagingDir.toString());
            tezSession = TezClient.create((String)"OrderedWordCountSession", (TezConfiguration)tezConf, (boolean)true);
            tezSession.start();
            SleepProcessor.SleepProcessorConfig spConf = new SleepProcessor.SleepProcessorConfig(1);
            for (int dagIndex = 1; dagIndex <= 2; ++dagIndex) {
                DAG dag = DAG.create((String)"TezSleepProcessor");
                Vertex vertex = Vertex.create((String)"SleepVertex", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload())), (int)1, (Resource)Resource.newInstance((int)1024, (int)1));
                dag.addVertex(vertex);
                DAGClient dagClient = null;
                try {
                    dagClient = tezSession.submitDAG(dag);
                    if (dagIndex > 1) {
                        Assert.fail((String)("Should fail due to duplicate dag name for dagIndex: " + dagIndex));
                    }
                }
                catch (TezException tex) {
                    if (dagIndex > 1) {
                        Assert.assertTrue((boolean)tex.getMessage().contains("Duplicate dag name "));
                        continue;
                    }
                    Assert.fail((String)"DuplicateDAGName exception thrown for 1st DAG submission");
                }
                DAGStatus dagStatus = dagClient.getDAGStatus(null);
                while (!dagStatus.isCompleted()) {
                    LOG.debug((Object)("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState()));
                    Thread.sleep(500L);
                    dagStatus = dagClient.getDAGStatus(null);
                }
            }
        }
        finally {
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    @Test
    public void testNonDefaultFSStagingDir() throws Exception {
        SleepProcessor.SleepProcessorConfig spConf = new SleepProcessor.SleepProcessorConfig(1);
        DAG dag = DAG.create((String)"TezSleepProcessor");
        Vertex vertex = Vertex.create((String)"SleepVertex", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload())), (int)1, (Resource)Resource.newInstance((int)1024, (int)1));
        dag.addVertex(vertex);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        Path stagingDir = new Path(TEST_ROOT_DIR, "testNonDefaultFSStagingDir" + String.valueOf(this.random.nextInt(100000)));
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)tezConf);
        stagingDir = localFs.makeQualified(stagingDir);
        localFs.mkdirs(stagingDir);
        tezConf.set("tez.staging-dir", stagingDir.toString());
        TezClient tezSession = TezClient.create((String)"TezSleepProcessor", (TezConfiguration)tezConf, (boolean)false);
        tezSession.start();
        DAGClient dagClient = tezSession.submitDAG(dag);
        DAGStatus dagStatus = dagClient.getDAGStatus(null);
        while (!dagStatus.isCompleted()) {
            LOG.info((Object)("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState()));
            Thread.sleep(500L);
            dagStatus = dagClient.getDAGStatus(null);
        }
        dagStatus = dagClient.getDAGStatus((Set)Sets.newHashSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
        Assert.assertNotNull((Object)dagStatus.getDAGCounters());
        Assert.assertNotNull((Object)dagStatus.getDAGCounters().getGroup(FileSystemCounter.class.getName()));
        Assert.assertNotNull((Object)dagStatus.getDAGCounters().findCounter((Enum)TaskCounter.GC_TIME_MILLIS));
        ExampleDriver.printDAGStatus((DAGClient)dagClient, (String[])new String[]{"SleepVertex"}, (boolean)true, (boolean)true);
        tezSession.stop();
    }

    @Test(timeout=60000L)
    public void testHistoryLogging() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        SleepProcessor.SleepProcessorConfig spConf = new SleepProcessor.SleepProcessorConfig(1);
        DAG dag = DAG.create((String)"TezSleepProcessorHistoryLogging");
        Vertex vertex = Vertex.create((String)"SleepVertex", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload())), (int)2, (Resource)Resource.newInstance((int)1024, (int)1));
        dag.addVertex(vertex);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(this.random.nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)tezConf);
        Path historyLogDir = new Path(TEST_ROOT_DIR, "testHistoryLogging");
        localFs.mkdirs(historyLogDir);
        tezConf.set("tez.simple.history.logging.dir", localFs.makeQualified(historyLogDir).toString());
        tezConf.setBoolean("tez.am.mode.session", false);
        TezClient tezSession = TezClient.create((String)"TezSleepProcessorHistoryLogging", (TezConfiguration)tezConf);
        tezSession.start();
        DAGClient dagClient = tezSession.submitDAG(dag);
        DAGStatus dagStatus = dagClient.getDAGStatus(null);
        while (!dagStatus.isCompleted()) {
            LOG.info((Object)("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState()));
            Thread.sleep(500L);
            dagStatus = dagClient.getDAGStatus(null);
        }
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
        FileStatus historyLogFileStatus = null;
        for (FileStatus fileStatus : localFs.listStatus(historyLogDir)) {
            Path p;
            if (fileStatus.isDirectory() || !(p = fileStatus.getPath()).getName().startsWith("history.txt")) continue;
            historyLogFileStatus = fileStatus;
            break;
        }
        Assert.assertNotNull(historyLogFileStatus);
        Assert.assertTrue((historyLogFileStatus.getLen() > 0L ? 1 : 0) != 0);
        tezSession.stop();
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobDagSubmit() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(false, false, false, false);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobDagSubmitAndKill() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(false, true, false, false);
        Assert.assertEquals((Object)DAGStatus.State.KILLED, (Object)finalState);
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobViaSession() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, false);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
    }

    @Test(timeout=120000L)
    public void testAMRelocalization() throws Exception {
        Path relocPath = new Path("/tmp/relocalizationfilefound");
        if (remoteFs.exists(relocPath)) {
            remoteFs.delete(relocPath, true);
        }
        TezClient tezSession = this.createTezSession();
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertFalse((boolean)remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
        LOG.info((Object)"Creating jar for relocalization test");
        Path relocFilePath = new Path("/tmp/test.jar");
        relocFilePath = remoteFs.makeQualified(relocFilePath);
        FSDataOutputStream os = remoteFs.create(relocFilePath, true);
        TestMRRJobsDAGApi.createTestJar((OutputStream)os, RELOCALIZATION_TEST_CLASS_NAME);
        Path tezAppJar = new Path(MiniTezCluster.APPJAR);
        Path tezAppJarRemote = remoteFs.makeQualified(new Path("/tmp/" + tezAppJar.getName()));
        remoteFs.copyFromLocalFile(tezAppJar, tezAppJarRemote);
        HashMap<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
        additionalResources.put("test.jar", this.createLrObjFromPath(relocFilePath));
        additionalResources.put("TezAppJar.jar", this.createLrObjFromPath(tezAppJarRemote));
        Assert.assertEquals((Object)TezAppMasterStatus.READY, (Object)tezSession.getAppMasterStatus());
        finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertEquals((Object)TezAppMasterStatus.READY, (Object)tezSession.getAppMasterStatus());
        Assert.assertTrue((boolean)remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
        this.stopAndVerifyYarnApp(tezSession);
    }

    private void stopAndVerifyYarnApp(TezClient tezSession) throws TezException, IOException, YarnException {
        ApplicationReport appReport;
        ApplicationId appId = tezSession.getAppMasterApplicationId();
        tezSession.stop();
        Assert.assertEquals((Object)TezAppMasterStatus.SHUTDOWN, (Object)tezSession.getAppMasterStatus());
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(mrrTezCluster.getConfig());
        yarnClient.start();
        while (!((appReport = yarnClient.getApplicationReport(appId)).getYarnApplicationState().equals((Object)YarnApplicationState.FINISHED) || appReport.getYarnApplicationState().equals((Object)YarnApplicationState.FAILED) || appReport.getYarnApplicationState().equals((Object)YarnApplicationState.KILLED))) {
        }
        appReport = yarnClient.getApplicationReport(appId);
        Assert.assertEquals((Object)YarnApplicationState.FINISHED, (Object)appReport.getYarnApplicationState());
        Assert.assertEquals((Object)FinalApplicationStatus.SUCCEEDED, (Object)appReport.getFinalApplicationStatus());
    }

    @Test(timeout=120000L)
    public void testAMRelocalizationConflict() throws Exception {
        Path relocPath = new Path("/tmp/relocalizationfilefound");
        if (remoteFs.exists(relocPath)) {
            remoteFs.delete(relocPath, true);
        }
        TezClient tezSession = this.createTezSession();
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertFalse((boolean)remoteFs.exists(relocPath));
        LOG.info((Object)"Creating jar for relocalization test");
        Path tezAppJar = new Path(MiniTezCluster.APPJAR);
        Path tezAppJarRemote = remoteFs.makeQualified(new Path("/tmp/" + tezAppJar.getName()));
        FSDataOutputStream os = remoteFs.create(tezAppJarRemote, true);
        TestMRRJobsDAGApi.createTestJar((OutputStream)os, RELOCALIZATION_TEST_CLASS_NAME);
        HashMap<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
        additionalResources.put("TezAppJar.jar", this.createLrObjFromPath(tezAppJarRemote));
        try {
            this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
            Assert.fail((String)"should have failed");
        }
        catch (Exception ex) {
            // empty catch block
        }
        this.stopAndVerifyYarnApp(tezSession);
    }

    private LocalResource createLrObjFromPath(Path filePath) {
        return LocalResource.newInstance((URL)ConverterUtils.getYarnUrlFromPath((Path)filePath), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.PRIVATE, (long)0L, (long)0L);
    }

    private TezClient createTezSession() throws IOException, TezException {
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random().nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        TezClient tezSession = TezClient.create((String)"testrelocalizationsession", (TezConfiguration)tezConf, (boolean)true);
        tezSession.start();
        Assert.assertEquals((Object)TezAppMasterStatus.INITIALIZING, (Object)tezSession.getAppMasterStatus());
        return tezSession;
    }

    @Test(timeout=120000L)
    public void testMultipleMRRSleepJobViaSession() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random().nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        TezClient tezSession = TezClient.create((String)"testsession", (TezConfiguration)tezConf, (boolean)true);
        tezSession.start();
        Assert.assertEquals((Object)TezAppMasterStatus.INITIALIZING, (Object)tezSession.getAppMasterStatus());
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, false, null, null);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertEquals((Object)TezAppMasterStatus.READY, (Object)tezSession.getAppMasterStatus());
        finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, false, null, null);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertEquals((Object)TezAppMasterStatus.READY, (Object)tezSession.getAppMasterStatus());
        this.stopAndVerifyYarnApp(tezSession);
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, true, false, false);
        Assert.assertEquals((Object)DAGStatus.State.KILLED, (Object)finalState);
    }

    @Test(timeout=60000L)
    public void testTezSessionShutdown() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        this.testMRRSleepJobDagSubmitCore(true, false, true, false);
    }

    @Test(timeout=60000L)
    public void testAMSplitGeneration() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        this.testMRRSleepJobDagSubmitCore(true, false, false, true);
    }

    public DAGStatus.State testMRRSleepJobDagSubmitCore(boolean dagViaRPC, boolean killDagWhileRunning, boolean closeSessionBeforeSubmit, boolean genSplitsInAM) throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        return this.testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning, closeSessionBeforeSubmit, null, genSplitsInAM, null, null);
    }

    public DAGStatus.State testMRRSleepJobDagSubmitCore(boolean dagViaRPC, boolean killDagWhileRunning, boolean closeSessionBeforeSubmit, TezClient reUseTezSession, boolean genSplitsInAM, Class<? extends InputInitializer> initializerClass, Map<String, LocalResource> additionalLocalResources) throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DataSourceDescriptor dsd;
        LOG.info((Object)"\n\n\nStarting testMRRSleepJobDagSubmit().");
        JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
        JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
        JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
        stage1Conf.setLong("mrr.sleepjob.map.sleep.time", 1L);
        stage1Conf.setInt("mrr.sleepjob.map.sleep.count", 1);
        stage1Conf.setInt("mapreduce.job.maps", 1);
        stage1Conf.set("mapreduce.job.map.class", MRRSleepJob.SleepMapper.class.getName());
        stage1Conf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
        stage1Conf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        stage1Conf.set("mapreduce.job.inputformat.class", MRRSleepJob.SleepInputFormat.class.getName());
        stage1Conf.set("mapreduce.job.partitioner.class", MRRSleepJob.MRRSleepJobPartitioner.class.getName());
        stage2Conf.setLong("mrr.sleepjob.reduce.sleep.time", 1L);
        stage2Conf.setInt("mrr.sleepjob.reduce.sleep.count", 1);
        stage2Conf.setInt("mapreduce.job.reduces", 1);
        stage2Conf.set("mapreduce.job.reduce.class", MRRSleepJob.ISleepReducer.class.getName());
        stage2Conf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
        stage2Conf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        stage2Conf.set("mapreduce.job.partitioner.class", MRRSleepJob.MRRSleepJobPartitioner.class.getName());
        stage3Conf.setLong("mrr.sleepjob.reduce.sleep.time", 1L);
        stage3Conf.setInt("mrr.sleepjob.reduce.sleep.count", 1);
        stage3Conf.setInt("mapreduce.job.reduces", 1);
        stage3Conf.set("mapreduce.job.reduce.class", MRRSleepJob.SleepReducer.class.getName());
        stage3Conf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
        stage3Conf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        MRHelpers.translateMRConfToTez((Configuration)stage1Conf);
        MRHelpers.translateMRConfToTez((Configuration)stage2Conf);
        MRHelpers.translateMRConfToTez((Configuration)stage3Conf);
        MRHelpers.configureMRApiUsage((Configuration)stage1Conf);
        MRHelpers.configureMRApiUsage((Configuration)stage2Conf);
        MRHelpers.configureMRApiUsage((Configuration)stage3Conf);
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random().nextInt(100000))));
        TezClientUtils.ensureStagingDirExists((Configuration)conf, (Path)remoteStagingDir);
        UserPayload stage1Payload = TezUtils.createUserPayloadFromConf((Configuration)stage1Conf);
        UserPayload stage2Payload = TezUtils.createUserPayloadFromConf((Configuration)stage2Conf);
        UserPayload stage3Payload = TezUtils.createUserPayloadFromConf((Configuration)stage3Conf);
        DAG dag = DAG.create((String)("testMRRSleepJobDagSubmit-" + this.random.nextInt(1000)));
        Class inputInitializerClazz = genSplitsInAM ? (initializerClass == null ? MRInputAMSplitGenerator.class : initializerClass) : null;
        LOG.info((Object)("Using initializer class: " + initializerClass));
        if (!genSplitsInAM) {
            dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration((Configuration)stage1Conf, (Path)remoteStagingDir, (boolean)true);
        } else if (initializerClass == null) {
            dsd = MRInputLegacy.createConfigBuilder((Configuration)stage1Conf, MRRSleepJob.SleepInputFormat.class).build();
        } else {
            InputInitializerDescriptor iid = InputInitializerDescriptor.create((String)inputInitializerClazz.getName());
            dsd = MRInputLegacy.createConfigBuilder((Configuration)stage1Conf, MRRSleepJob.SleepInputFormat.class).setCustomInitializerDescriptor(iid).build();
        }
        Vertex stage1Vertex = Vertex.create((String)"map", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)MapProcessor.class.getName()).setUserPayload(stage1Payload)), (int)dsd.getNumberOfShards(), (Resource)Resource.newInstance((int)256, (int)1));
        stage1Vertex.addDataSource("MRInput", dsd);
        Vertex stage2Vertex = Vertex.create((String)"ireduce", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(stage2Payload)), (int)1, (Resource)Resource.newInstance((int)256, (int)1));
        Vertex stage3Vertex = Vertex.create((String)"reduce", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(stage3Payload)), (int)1, (Resource)Resource.newInstance((int)256, (int)1));
        stage3Vertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder((Configuration)stage3Conf, NullOutputFormat.class).build());
        dag.addVertex(stage1Vertex);
        dag.addVertex(stage2Vertex);
        dag.addVertex(stage3Vertex);
        Edge edge1 = Edge.create((Vertex)stage1Vertex, (Vertex)stage2Vertex, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)((OutputDescriptor)OutputDescriptor.create((String)OrderedPartitionedKVOutput.class.getName()).setUserPayload(stage2Payload)), (InputDescriptor)((InputDescriptor)InputDescriptor.create((String)OrderedGroupedInputLegacy.class.getName()).setUserPayload(stage2Payload))));
        Edge edge2 = Edge.create((Vertex)stage2Vertex, (Vertex)stage3Vertex, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)((OutputDescriptor)OutputDescriptor.create((String)OrderedPartitionedKVOutput.class.getName()).setUserPayload(stage3Payload)), (InputDescriptor)((InputDescriptor)InputDescriptor.create((String)OrderedGroupedInputLegacy.class.getName()).setUserPayload(stage3Payload))));
        dag.addEdge(edge1);
        dag.addEdge(edge2);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        DAGClient dagClient = null;
        boolean reuseSession = reUseTezSession != null;
        TezClient tezSession = null;
        if (!dagViaRPC) {
            Preconditions.checkArgument((!reuseSession ? 1 : 0) != 0);
        }
        if (!reuseSession) {
            TezConfiguration tempTezconf = new TezConfiguration((Configuration)tezConf);
            if (!dagViaRPC) {
                tempTezconf.setBoolean("tez.am.mode.session", false);
            } else {
                tempTezconf.setBoolean("tez.am.mode.session", true);
            }
            tezSession = TezClient.create((String)"testsession", (TezConfiguration)tempTezconf);
            tezSession.start();
        } else {
            tezSession = reUseTezSession;
        }
        if (!dagViaRPC) {
            dagClient = tezSession.submitDAG(dag);
        }
        if (dagViaRPC && closeSessionBeforeSubmit) {
            YarnApplicationState appState;
            ApplicationReport appReport;
            YarnClient yarnClient = YarnClient.createYarnClient();
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            boolean sentKillSession = false;
            while (true) {
                Thread.sleep(500L);
                appReport = yarnClient.getApplicationReport(tezSession.getAppMasterApplicationId());
                if (appReport == null) continue;
                appState = appReport.getYarnApplicationState();
                if (!sentKillSession) {
                    if (appState != YarnApplicationState.RUNNING) continue;
                    tezSession.stop();
                    sentKillSession = true;
                    continue;
                }
                if (appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED || appState == YarnApplicationState.FAILED) break;
            }
            LOG.info((Object)("Application completed after sending session shutdown, yarnApplicationState=" + appState + ", finalAppStatus=" + appReport.getFinalApplicationStatus()));
            Assert.assertEquals((Object)YarnApplicationState.FINISHED, (Object)appState);
            Assert.assertEquals((Object)FinalApplicationStatus.SUCCEEDED, (Object)appReport.getFinalApplicationStatus());
            yarnClient.stop();
            return null;
        }
        if (dagViaRPC) {
            LOG.info((Object)("Submitting dag to tez session with appId=" + tezSession.getAppMasterApplicationId() + " and Dag Name=" + dag.getName()));
            if (additionalLocalResources != null) {
                tezSession.addAppMasterLocalFiles(additionalLocalResources);
            }
            dagClient = tezSession.submitDAG(dag);
            Assert.assertEquals((Object)TezAppMasterStatus.RUNNING, (Object)tezSession.getAppMasterStatus());
        }
        DAGStatus dagStatus = dagClient.getDAGStatus(null);
        while (!dagStatus.isCompleted()) {
            LOG.info((Object)("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState()));
            Thread.sleep(500L);
            if (killDagWhileRunning && dagStatus.getState() == DAGStatus.State.RUNNING) {
                LOG.info((Object)"Killing running dag/session");
                if (dagViaRPC) {
                    tezSession.stop();
                } else {
                    dagClient.tryKillDAG();
                }
            }
            dagStatus = dagClient.getDAGStatus(null);
        }
        if (!reuseSession) {
            tezSession.stop();
        }
        return dagStatus.getState();
    }

    private static LocalResource createLocalResource(FileSystem fc, Path file, LocalResourceType type, LocalResourceVisibility visibility) throws IOException {
        FileStatus fstat = fc.getFileStatus(file);
        URL resourceURL = ConverterUtils.getYarnUrlFromPath((Path)fc.resolvePath(fstat.getPath()));
        long resourceSize = fstat.getLen();
        long resourceModificationTime = fstat.getModificationTime();
        return LocalResource.newInstance((URL)resourceURL, (LocalResourceType)type, (LocalResourceVisibility)visibility, (long)resourceSize, (long)resourceModificationTime);
    }

    @Test(timeout=60000L)
    public void testVertexGroups() throws Exception {
        LOG.info((Object)"Running Group Test");
        Path inPath = new Path(TEST_ROOT_DIR, "in-groups");
        Path outPath = new Path(TEST_ROOT_DIR, "out-groups");
        FSDataOutputStream out = remoteFs.create(inPath);
        OutputStreamWriter writer = new OutputStreamWriter((OutputStream)out);
        writer.write("abcd ");
        writer.write("efgh ");
        writer.write("abcd ");
        writer.write("efgh ");
        writer.close();
        out.close();
        UnionExample job = new UnionExample();
        if (!job.run(inPath.toString(), outPath.toString(), mrrTezCluster.getConfig())) {
            throw new TezUncheckedException("VertexGroups Test Failed");
        }
        LOG.info((Object)"Success VertexGroups Test");
    }

    @Test(timeout=60000L)
    public void testBroadcastAndOneToOne() throws Exception {
        LOG.info((Object)"Running BroadcastAndOneToOne Test");
        BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample();
        if (!job.run(mrrTezCluster.getConfig(), true)) {
            throw new TezUncheckedException("BroadcastAndOneToOne Test Failed");
        }
        LOG.info((Object)"Success BroadcastAndOneToOne Test");
    }

    private static void createTestJar(OutputStream outStream, String dummyClassName) throws URISyntaxException, IOException {
        int nRead;
        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
        SimpleJavaFileObjectImpl srcFileObject = new SimpleJavaFileObjectImpl(URI.create("string:///" + dummyClassName + JavaFileObject.Kind.SOURCE.extension), JavaFileObject.Kind.SOURCE);
        StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
        compiler.getTask(null, fileManager, null, null, null, Collections.singletonList(srcFileObject)).call();
        JavaFileObject javaFileObject = fileManager.getJavaFileForOutput(StandardLocation.CLASS_OUTPUT, dummyClassName, JavaFileObject.Kind.CLASS, null);
        File classFile = new File(dummyClassName + JavaFileObject.Kind.CLASS.extension);
        JarOutputStream jarOutputStream = new JarOutputStream(outStream);
        JarEntry jarEntry = new JarEntry(classFile.getName());
        jarEntry.setTime(classFile.lastModified());
        jarOutputStream.putNextEntry(jarEntry);
        InputStream in = javaFileObject.openInputStream();
        byte[] buffer = new byte[4096];
        while ((nRead = in.read(buffer, 0, buffer.length)) > 0) {
            jarOutputStream.write(buffer, 0, nRead);
        }
        in.close();
        jarOutputStream.close();
        javaFileObject.delete();
    }

    static {
        conf = new Configuration();
        TEST_ROOT_DIR = "target/" + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
        RELOCALIZATION_TEST_CLASS_NAME = "AMClassloadTestDummyClass";
    }

    private static class SimpleJavaFileObjectImpl
    extends SimpleJavaFileObject {
        static final String code = "public class AMClassloadTestDummyClass {}";

        SimpleJavaFileObjectImpl(URI uri, JavaFileObject.Kind kind) {
            super(uri, kind);
        }

        @Override
        public CharSequence getCharContent(boolean ignoreEncodingErrors) {
            return code;
        }
    }

    public static class MRInputAMSplitGeneratorRelocalizationTest
    extends MRInputAMSplitGenerator {
        public MRInputAMSplitGeneratorRelocalizationTest(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        public List<Event> initialize() throws Exception {
            MRRuntimeProtos.MRInputUserPayloadProto userPayloadProto = MRInputHelpers.parseMRInputPayload((UserPayload)this.getContext().getInputUserPayload());
            Configuration conf = TezUtils.createConfFromByteString((ByteString)userPayloadProto.getConfigurationBytes());
            try {
                ReflectionUtils.getClazz((String)RELOCALIZATION_TEST_CLASS_NAME);
                LOG.info((Object)"Class found");
                FileSystem fs = FileSystem.get((Configuration)conf);
                fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
            }
            catch (TezUncheckedException e) {
                LOG.info((Object)"Class not found");
            }
            return super.initialize();
        }
    }
}

