/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce;

import com.google.protobuf.ByteString;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import javax.tools.JavaCompiler;
import javax.tools.JavaFileObject;
import javax.tools.SimpleJavaFileObject;
import javax.tools.StandardJavaFileManager;
import javax.tools.StandardLocation;
import javax.tools.ToolProvider;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.AMConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezSession;
import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.client.TezSessionStatus;
import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.examples.MRRSleepJob;
import org.apache.tez.mapreduce.examples.UnionExample;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezRootInputInitializer;
import org.apache.tez.runtime.api.TezRootInputInitializerContext;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import org.apache.tez.test.MiniTezCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestMRRJobsDAGApi {
    private static final Log LOG = LogFactory.getLog(TestMRRJobsDAGApi.class);
    protected static MiniTezCluster mrrTezCluster;
    protected static MiniDFSCluster dfsCluster;
    private static Configuration conf;
    private static FileSystem remoteFs;
    private static String TEST_ROOT_DIR;
    private static String RELOCALIZATION_TEST_CLASS_NAME;

    @BeforeClass
    public static void setup() throws IOException {
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        if (mrrTezCluster == null) {
            mrrTezCluster = new MiniTezCluster(TestMRRJobsDAGApi.class.getName(), 1, 1, 1);
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", remoteFs.getUri().toString());
            conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
            mrrTezCluster.init(conf);
            mrrTezCluster.start();
        }
    }

    @AfterClass
    public static void tearDown() {
        if (mrrTezCluster != null) {
            mrrTezCluster.stop();
            mrrTezCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
            dfsCluster = null;
        }
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobDagSubmit() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(false, false, false, false);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobDagSubmitAndKill() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(false, true, false, false);
        Assert.assertEquals((Object)DAGStatus.State.KILLED, (Object)finalState);
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobViaSession() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, false);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
    }

    @Test(timeout=120000L)
    public void testAMRelocalization() throws Exception {
        ApplicationReport appReport;
        Map<String, String> commonEnv = this.createCommonEnv();
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random().nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        HashMap amLocalResources = new HashMap();
        AMConfiguration amConfig = new AMConfiguration(commonEnv, amLocalResources, tezConf, null);
        TezSessionConfiguration tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
        TezSession tezSession = new TezSession("testrelocalizationsession", tezSessionConfig);
        tezSession.start();
        Assert.assertEquals((Object)TezSessionStatus.INITIALIZING, (Object)tezSession.getSessionStatus());
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, null);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertFalse((boolean)remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
        LOG.info((Object)"Creating jar for relocalization test");
        Path relocFilePath = new Path("/tmp/test.jar");
        relocFilePath = remoteFs.makeQualified(relocFilePath);
        FSDataOutputStream os = remoteFs.create(relocFilePath, true);
        TestMRRJobsDAGApi.createTestJar((OutputStream)os, RELOCALIZATION_TEST_CLASS_NAME);
        HashMap<String, LocalResource> additionalResources = new HashMap<String, LocalResource>();
        additionalResources.put("test.jar", LocalResource.newInstance((URL)ConverterUtils.getYarnUrlFromPath((Path)relocFilePath), (LocalResourceType)LocalResourceType.FILE, (LocalResourceVisibility)LocalResourceVisibility.PRIVATE, (long)0L, (long)0L));
        Assert.assertEquals((Object)TezSessionStatus.READY, (Object)tezSession.getSessionStatus());
        finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, true, MRInputAMSplitGeneratorRelocalizationTest.class, additionalResources);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertEquals((Object)TezSessionStatus.READY, (Object)tezSession.getSessionStatus());
        Assert.assertTrue((boolean)remoteFs.exists(new Path("/tmp/relocalizationfilefound")));
        ApplicationId appId = tezSession.getApplicationId();
        tezSession.stop();
        Assert.assertEquals((Object)TezSessionStatus.SHUTDOWN, (Object)tezSession.getSessionStatus());
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(mrrTezCluster.getConfig());
        yarnClient.start();
        while (!((appReport = yarnClient.getApplicationReport(appId)).getYarnApplicationState().equals((Object)YarnApplicationState.FINISHED) || appReport.getYarnApplicationState().equals((Object)YarnApplicationState.FAILED) || appReport.getYarnApplicationState().equals((Object)YarnApplicationState.KILLED))) {
        }
        appReport = yarnClient.getApplicationReport(appId);
        Assert.assertEquals((Object)YarnApplicationState.FINISHED, (Object)appReport.getYarnApplicationState());
        Assert.assertEquals((Object)FinalApplicationStatus.SUCCEEDED, (Object)appReport.getFinalApplicationStatus());
    }

    @Test(timeout=120000L)
    public void testMultipleMRRSleepJobViaSession() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        ApplicationReport appReport;
        Map<String, String> commonEnv = this.createCommonEnv();
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random().nextInt(100000))));
        remoteFs.mkdirs(remoteStagingDir);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        HashMap amLocalResources = new HashMap();
        AMConfiguration amConfig = new AMConfiguration(commonEnv, amLocalResources, tezConf, null);
        TezSessionConfiguration tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
        TezSession tezSession = new TezSession("testsession", tezSessionConfig);
        tezSession.start();
        Assert.assertEquals((Object)TezSessionStatus.INITIALIZING, (Object)tezSession.getSessionStatus());
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, false, null, null);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertEquals((Object)TezSessionStatus.READY, (Object)tezSession.getSessionStatus());
        finalState = this.testMRRSleepJobDagSubmitCore(true, false, false, tezSession, false, null, null);
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)finalState);
        Assert.assertEquals((Object)TezSessionStatus.READY, (Object)tezSession.getSessionStatus());
        ApplicationId appId = tezSession.getApplicationId();
        tezSession.stop();
        Assert.assertEquals((Object)TezSessionStatus.SHUTDOWN, (Object)tezSession.getSessionStatus());
        YarnClient yarnClient = YarnClient.createYarnClient();
        yarnClient.init(mrrTezCluster.getConfig());
        yarnClient.start();
        while (!((appReport = yarnClient.getApplicationReport(appId)).getYarnApplicationState().equals((Object)YarnApplicationState.FINISHED) || appReport.getYarnApplicationState().equals((Object)YarnApplicationState.FAILED) || appReport.getYarnApplicationState().equals((Object)YarnApplicationState.KILLED))) {
        }
        appReport = yarnClient.getApplicationReport(appId);
        Assert.assertEquals((Object)YarnApplicationState.FINISHED, (Object)appReport.getYarnApplicationState());
        Assert.assertEquals((Object)FinalApplicationStatus.SUCCEEDED, (Object)appReport.getFinalApplicationStatus());
    }

    @Test(timeout=60000L)
    public void testMRRSleepJobDagSubmitAndKillViaRPC() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        DAGStatus.State finalState = this.testMRRSleepJobDagSubmitCore(true, true, false, false);
        Assert.assertEquals((Object)DAGStatus.State.KILLED, (Object)finalState);
    }

    @Test(timeout=60000L)
    public void testTezSessionShutdown() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        this.testMRRSleepJobDagSubmitCore(true, false, true, false);
    }

    @Test(timeout=60000L)
    public void testAMSplitGeneration() throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        this.testMRRSleepJobDagSubmitCore(true, false, false, true);
    }

    public DAGStatus.State testMRRSleepJobDagSubmitCore(boolean dagViaRPC, boolean killDagWhileRunning, boolean closeSessionBeforeSubmit, boolean genSplitsInAM) throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        return this.testMRRSleepJobDagSubmitCore(dagViaRPC, killDagWhileRunning, closeSessionBeforeSubmit, null, genSplitsInAM, null, null);
    }

    private Map<String, String> createCommonEnv() {
        HashMap<String, String> commonEnv = new HashMap<String, String>();
        return commonEnv;
    }

    public DAGStatus.State testMRRSleepJobDagSubmitCore(boolean dagViaRPC, boolean killDagWhileRunning, boolean closeSessionBeforeSubmit, TezSession reUseTezSession, boolean genSplitsInAM, Class<? extends TezRootInputInitializer> initializerClass, Map<String, LocalResource> additionalLocalResources) throws IOException, InterruptedException, TezException, ClassNotFoundException, YarnException {
        int stage1NumTasks;
        LOG.info((Object)"\n\n\nStarting testMRRSleepJobDagSubmit().");
        JobConf stage1Conf = new JobConf(mrrTezCluster.getConfig());
        JobConf stage2Conf = new JobConf(mrrTezCluster.getConfig());
        JobConf stage3Conf = new JobConf(mrrTezCluster.getConfig());
        stage1Conf.setLong("mrr.sleepjob.map.sleep.time", 1L);
        stage1Conf.setInt("mrr.sleepjob.map.sleep.count", 1);
        stage1Conf.setInt("mapreduce.job.maps", 1);
        stage1Conf.set("mapreduce.job.map.class", MRRSleepJob.SleepMapper.class.getName());
        stage1Conf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
        stage1Conf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        stage1Conf.set("mapreduce.job.inputformat.class", MRRSleepJob.SleepInputFormat.class.getName());
        stage1Conf.set("mapreduce.job.partitioner.class", MRRSleepJob.MRRSleepJobPartitioner.class.getName());
        stage2Conf.setLong("mrr.sleepjob.reduce.sleep.time", 1L);
        stage2Conf.setInt("mrr.sleepjob.reduce.sleep.count", 1);
        stage2Conf.setInt("mapreduce.job.reduces", 1);
        stage2Conf.set("mapreduce.job.reduce.class", MRRSleepJob.ISleepReducer.class.getName());
        stage2Conf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
        stage2Conf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        stage2Conf.set("mapreduce.job.partitioner.class", MRRSleepJob.MRRSleepJobPartitioner.class.getName());
        JobConf stage22Conf = new JobConf((Configuration)stage2Conf);
        stage22Conf.setInt("mapreduce.job.reduces", 2);
        stage3Conf.setLong("mrr.sleepjob.reduce.sleep.time", 1L);
        stage3Conf.setInt("mrr.sleepjob.reduce.sleep.count", 1);
        stage3Conf.setInt("mapreduce.job.reduces", 1);
        stage3Conf.set("mapreduce.job.reduce.class", MRRSleepJob.SleepReducer.class.getName());
        stage3Conf.set("mapreduce.map.output.key.class", IntWritable.class.getName());
        stage3Conf.set("mapreduce.map.output.value.class", IntWritable.class.getName());
        stage3Conf.set("mapreduce.job.outputformat.class", NullOutputFormat.class.getName());
        MultiStageMRConfToTezTranslator.translateVertexConfToTez((Configuration)stage1Conf, null);
        MultiStageMRConfToTezTranslator.translateVertexConfToTez((Configuration)stage2Conf, (Configuration)stage1Conf);
        MultiStageMRConfToTezTranslator.translateVertexConfToTez((Configuration)stage22Conf, (Configuration)stage1Conf);
        MultiStageMRConfToTezTranslator.translateVertexConfToTez((Configuration)stage3Conf, (Configuration)stage2Conf);
        MRHelpers.doJobClientMagic((Configuration)stage1Conf);
        MRHelpers.doJobClientMagic((Configuration)stage2Conf);
        MRHelpers.doJobClientMagic((Configuration)stage22Conf);
        MRHelpers.doJobClientMagic((Configuration)stage3Conf);
        Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(new Random().nextInt(100000))));
        TezClientUtils.ensureStagingDirExists((Configuration)conf, (Path)remoteStagingDir);
        InputSplitInfoDisk inputSplitInfo = null;
        if (!genSplitsInAM) {
            inputSplitInfo = MRHelpers.generateInputSplits((Configuration)stage1Conf, (Path)remoteStagingDir);
        }
        byte[] stage1Payload = MRHelpers.createUserPayloadFromConf((Configuration)stage1Conf);
        byte[] stage1InputPayload = MRHelpers.createMRInputPayload((byte[])stage1Payload, null);
        byte[] stage3Payload = MRHelpers.createUserPayloadFromConf((Configuration)stage3Conf);
        DAG dag = new DAG("testMRRSleepJobDagSubmit");
        int n = stage1NumTasks = genSplitsInAM ? -1 : inputSplitInfo.getNumTasks();
        Class inputInitializerClazz = genSplitsInAM ? (initializerClass == null ? MRInputAMSplitGenerator.class : initializerClass) : null;
        LOG.info((Object)("Using initializer class: " + initializerClass));
        Vertex stage1Vertex = new Vertex("map", new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(stage1Payload), stage1NumTasks, Resource.newInstance((int)256, (int)1));
        MRHelpers.addMRInput((Vertex)stage1Vertex, (byte[])stage1InputPayload, inputInitializerClazz);
        Vertex stage2Vertex = new Vertex("ireduce", new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(MRHelpers.createUserPayloadFromConf((Configuration)stage2Conf)), 1, Resource.newInstance((int)256, (int)1));
        Vertex stage3Vertex = new Vertex("reduce", new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(stage3Payload), 1, Resource.newInstance((int)256, (int)1));
        MRHelpers.addMROutputLegacy((Vertex)stage3Vertex, (byte[])stage3Payload);
        HashMap commonLocalResources = new HashMap();
        Map<String, String> commonEnv = this.createCommonEnv();
        if (!genSplitsInAM) {
            HashMap<String, Object> stage1LocalResources = new HashMap<String, Object>();
            stage1LocalResources.put(inputSplitInfo.getSplitsFile().getName(), TestMRRJobsDAGApi.createLocalResource(remoteFs, inputSplitInfo.getSplitsFile(), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
            stage1LocalResources.put(inputSplitInfo.getSplitsMetaInfoFile().getName(), TestMRRJobsDAGApi.createLocalResource(remoteFs, inputSplitInfo.getSplitsMetaInfoFile(), LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
            stage1LocalResources.putAll(commonLocalResources);
            stage1Vertex.setTaskLocalResources(stage1LocalResources);
            stage1Vertex.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
        } else {
            stage1Vertex.setTaskLocalResources(commonLocalResources);
        }
        stage1Vertex.setJavaOpts(MRHelpers.getMapJavaOpts((Configuration)stage1Conf));
        stage1Vertex.setTaskEnvironment(commonEnv);
        stage2Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts((Configuration)stage2Conf));
        stage2Vertex.setTaskLocalResources(commonLocalResources);
        stage2Vertex.setTaskEnvironment(commonEnv);
        stage3Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts((Configuration)stage3Conf));
        stage3Vertex.setTaskLocalResources(commonLocalResources);
        stage3Vertex.setTaskEnvironment(commonEnv);
        dag.addVertex(stage1Vertex);
        dag.addVertex(stage2Vertex);
        dag.addVertex(stage3Vertex);
        Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class.getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName())));
        Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class.getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName())));
        dag.addEdge(edge1);
        dag.addEdge(edge2);
        HashMap amLocalResources = new HashMap();
        amLocalResources.putAll(commonLocalResources);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", remoteStagingDir.toString());
        TezClient tezClient = new TezClient(tezConf);
        DAGClient dagClient = null;
        TezSession tezSession = null;
        boolean reuseSession = reUseTezSession != null;
        AMConfiguration amConfig = new AMConfiguration(commonEnv, amLocalResources, tezConf, null);
        if (!dagViaRPC) {
            dagClient = tezClient.submitDAGApplication(dag, amConfig);
        } else if (reuseSession) {
            tezSession = reUseTezSession;
        } else {
            TezSessionConfiguration tezSessionConfig = new TezSessionConfiguration(amConfig, tezConf);
            tezSession = new TezSession("testsession", tezSessionConfig);
            tezSession.start();
        }
        if (dagViaRPC && closeSessionBeforeSubmit) {
            YarnApplicationState appState;
            ApplicationReport appReport;
            YarnClient yarnClient = YarnClient.createYarnClient();
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            boolean sentKillSession = false;
            while (true) {
                Thread.sleep(500L);
                appReport = yarnClient.getApplicationReport(tezSession.getApplicationId());
                if (appReport == null) continue;
                appState = appReport.getYarnApplicationState();
                if (!sentKillSession) {
                    if (appState != YarnApplicationState.RUNNING) continue;
                    tezSession.stop();
                    sentKillSession = true;
                    continue;
                }
                if (appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED || appState == YarnApplicationState.FAILED) break;
            }
            LOG.info((Object)("Application completed after sending session shutdown, yarnApplicationState=" + appState + ", finalAppStatus=" + appReport.getFinalApplicationStatus()));
            Assert.assertEquals((Object)YarnApplicationState.FINISHED, (Object)appState);
            Assert.assertEquals((Object)FinalApplicationStatus.SUCCEEDED, (Object)appReport.getFinalApplicationStatus());
            yarnClient.stop();
            return null;
        }
        if (dagViaRPC) {
            LOG.info((Object)("Submitting dag to tez session with appId=" + tezSession.getApplicationId()));
            dagClient = tezSession.submitDAG(dag, additionalLocalResources);
            Assert.assertEquals((Object)TezSessionStatus.RUNNING, (Object)tezSession.getSessionStatus());
        }
        DAGStatus dagStatus = dagClient.getDAGStatus(null);
        while (!dagStatus.isCompleted()) {
            LOG.info((Object)("Waiting for job to complete. Sleeping for 500ms. Current state: " + dagStatus.getState()));
            Thread.sleep(500L);
            if (killDagWhileRunning && dagStatus.getState() == DAGStatus.State.RUNNING) {
                LOG.info((Object)"Killing running dag/session");
                if (dagViaRPC) {
                    tezSession.stop();
                } else {
                    dagClient.tryKillDAG();
                }
            }
            dagStatus = dagClient.getDAGStatus(null);
        }
        if (dagViaRPC && !reuseSession) {
            tezSession.stop();
        }
        return dagStatus.getState();
    }

    private static LocalResource createLocalResource(FileSystem fc, Path file, LocalResourceType type, LocalResourceVisibility visibility) throws IOException {
        FileStatus fstat = fc.getFileStatus(file);
        URL resourceURL = ConverterUtils.getYarnUrlFromPath((Path)fc.resolvePath(fstat.getPath()));
        long resourceSize = fstat.getLen();
        long resourceModificationTime = fstat.getModificationTime();
        return LocalResource.newInstance((URL)resourceURL, (LocalResourceType)type, (LocalResourceVisibility)visibility, (long)resourceSize, (long)resourceModificationTime);
    }

    @Test(timeout=60000L)
    public void testVertexGroups() throws Exception {
        LOG.info((Object)"Running Group Test");
        Path inPath = new Path(TEST_ROOT_DIR, "in");
        Path outPath = new Path(TEST_ROOT_DIR, "out");
        FSDataOutputStream out = remoteFs.create(inPath);
        OutputStreamWriter writer = new OutputStreamWriter((OutputStream)out);
        writer.write("abcd ");
        writer.write("efgh ");
        writer.write("abcd ");
        writer.write("efgh ");
        writer.close();
        out.close();
        UnionExample job = new UnionExample();
        if (!job.run(inPath.toString(), outPath.toString(), mrrTezCluster.getConfig())) {
            throw new TezUncheckedException("VertexGroups Test Failed");
        }
        LOG.info((Object)"Success VertexGroups Test");
    }

    private static void createTestJar(OutputStream outStream, String dummyClassName) throws URISyntaxException, IOException {
        int nRead;
        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
        SimpleJavaFileObjectImpl srcFileObject = new SimpleJavaFileObjectImpl(URI.create("string:///" + dummyClassName + JavaFileObject.Kind.SOURCE.extension), JavaFileObject.Kind.SOURCE);
        StandardJavaFileManager fileManager = compiler.getStandardFileManager(null, null, null);
        compiler.getTask(null, fileManager, null, null, null, Collections.singletonList(srcFileObject)).call();
        JavaFileObject javaFileObject = fileManager.getJavaFileForOutput(StandardLocation.CLASS_OUTPUT, dummyClassName, JavaFileObject.Kind.CLASS, null);
        File classFile = new File(dummyClassName + JavaFileObject.Kind.CLASS.extension);
        JarOutputStream jarOutputStream = new JarOutputStream(outStream);
        JarEntry jarEntry = new JarEntry(classFile.getName());
        jarEntry.setTime(classFile.lastModified());
        jarOutputStream.putNextEntry(jarEntry);
        InputStream in = javaFileObject.openInputStream();
        byte[] buffer = new byte[4096];
        while ((nRead = in.read(buffer, 0, buffer.length)) > 0) {
            jarOutputStream.write(buffer, 0, nRead);
        }
        in.close();
        jarOutputStream.close();
        javaFileObject.delete();
    }

    static {
        conf = new Configuration();
        TEST_ROOT_DIR = "target/" + TestMRRJobsDAGApi.class.getName() + "-tmpDir";
        RELOCALIZATION_TEST_CLASS_NAME = "AMClassloadTestDummyClass";
    }

    private static class SimpleJavaFileObjectImpl
    extends SimpleJavaFileObject {
        static final String code = "public class AMClassloadTestDummyClass {}";

        SimpleJavaFileObjectImpl(URI uri, JavaFileObject.Kind kind) {
            super(uri, kind);
        }

        @Override
        public CharSequence getCharContent(boolean ignoreEncodingErrors) {
            return code;
        }
    }

    public static class MRInputAMSplitGeneratorRelocalizationTest
    extends MRInputAMSplitGenerator {
        public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception {
            MRRuntimeProtos.MRInputUserPayloadProto userPayloadProto = MRHelpers.parseMRInputPayload((byte[])rootInputContext.getUserPayload());
            Configuration conf = MRHelpers.createConfFromByteString((ByteString)userPayloadProto.getConfigurationBytes());
            try {
                RuntimeUtils.getClazz((String)RELOCALIZATION_TEST_CLASS_NAME);
                LOG.info((Object)"Class found");
                FileSystem fs = FileSystem.get((Configuration)conf);
                fs.mkdirs(new Path("/tmp/relocalizationfilefound"));
            }
            catch (TezUncheckedException e) {
                LOG.info((Object)"Class not found");
            }
            return super.initialize(rootInputContext);
        }
    }
}

