package org.apache.oozie.action.hadoop;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.jar.JarOutputStream;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.streaming.StreamJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.action.hadoop.ActionExecutorTestCase;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.ClassUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;

/* loaded from: input_file:org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.class */
public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.oozie.action.hadoop.ActionExecutorTestCase
    public void setSystemProps() throws Exception {
        super.setSystemProps();
        setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName());
        setSystemProperty("oozie.credentials.credentialclasses", "cred=org.apache.oozie.action.hadoop.CredentialForTest");
    }

    public void testLauncherJar() throws Exception {
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        assertTrue(new File(new Path(mapReduceActionExecutor.getOozieRuntimeDir(), mapReduceActionExecutor.getLauncherJarName()).toString()).exists());
    }

    public Element createUberJarActionXML(String str, String str2) throws Exception {
        return XmlUtils.parseXml("<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>" + str2 + "<configuration><property><name>oozie.mapreduce.uber.jar</name><value>" + str + "</value></property></configuration></map-reduce>");
    }

    public void testSetupMethods() throws Exception {
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        assertEquals("map-reduce", mapReduceActionExecutor.getType());
        assertEquals("map-reduce-launcher.jar", mapReduceActionExecutor.getLauncherJarName());
        ArrayList arrayList = new ArrayList();
        arrayList.add(LauncherMapper.class);
        arrayList.add(LauncherSecurityManager.class);
        arrayList.add(LauncherException.class);
        arrayList.add(LauncherMainException.class);
        arrayList.add(FileSystemActions.class);
        arrayList.add(PrepareActionsDriver.class);
        arrayList.add(ActionStats.class);
        arrayList.add(ActionType.class);
        arrayList.add(LauncherMain.class);
        arrayList.add(MapReduceMain.class);
        arrayList.add(StreamingMain.class);
        arrayList.add(PipesMain.class);
        assertEquals(arrayList, mapReduceActionExecutor.getLauncherClasses());
        Element parseXml = XmlUtils.parseXml("<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node><configuration><property><name>mapred.input.dir</name><value>IN</value></property><property><name>mapred.output.dir</name><value>OUT</value></property></configuration></map-reduce>");
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("user.name", getTestUser());
        WorkflowJobBean createBaseWorkflow = createBaseWorkflow(xConfiguration, "mr-action");
        WorkflowActionBean workflowActionBean = (WorkflowActionBean) createBaseWorkflow.getActions().get(0);
        workflowActionBean.setType(mapReduceActionExecutor.getType());
        ActionExecutorTestCase.Context context = new ActionExecutorTestCase.Context(createBaseWorkflow, workflowActionBean);
        JobConf createBaseHadoopConf = mapReduceActionExecutor.createBaseHadoopConf(context, parseXml);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf, context, parseXml, getFsTestCaseDir());
        assertEquals("IN", createBaseHadoopConf.get("mapred.input.dir"));
        Services services = Services.get();
        boolean z = services.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
        services.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);
        Element createUberJarActionXML = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
        JobConf createBaseHadoopConf2 = mapReduceActionExecutor.createBaseHadoopConf(context, createUberJarActionXML);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf2, context, createUberJarActionXML, getFsTestCaseDir());
        assertEquals(getNameNodeUri() + "/app/job.jar", createBaseHadoopConf2.get("oozie.mapreduce.uber.jar"));
        assertEquals(getNameNodeUri() + "/app/job.jar", mapReduceActionExecutor.createLauncherConf(getFileSystem(), context, workflowActionBean, createUberJarActionXML, createBaseHadoopConf2).getJar());
        Element createUberJarActionXML2 = createUberJarActionXML("/app/job.jar", "");
        JobConf createBaseHadoopConf3 = mapReduceActionExecutor.createBaseHadoopConf(context, createUberJarActionXML2);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf3, context, createUberJarActionXML2, getFsTestCaseDir());
        assertEquals(getNameNodeUri() + "/app/job.jar", createBaseHadoopConf3.get("oozie.mapreduce.uber.jar"));
        assertEquals(getNameNodeUri() + "/app/job.jar", mapReduceActionExecutor.createLauncherConf(getFileSystem(), context, workflowActionBean, createUberJarActionXML2, createBaseHadoopConf3).getJar());
        Element createUberJarActionXML3 = createUberJarActionXML("job.jar", "");
        JobConf createBaseHadoopConf4 = mapReduceActionExecutor.createBaseHadoopConf(context, createUberJarActionXML3);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf4, context, createUberJarActionXML3, getFsTestCaseDir());
        assertEquals(getFsTestCaseDir() + "/job.jar", createBaseHadoopConf4.get("oozie.mapreduce.uber.jar"));
        assertEquals(getFsTestCaseDir() + "/job.jar", mapReduceActionExecutor.createLauncherConf(getFileSystem(), context, workflowActionBean, createUberJarActionXML3, createBaseHadoopConf4).getJar());
        Element createUberJarActionXML4 = createUberJarActionXML("job.jar", "<streaming></streaming>");
        JobConf createBaseHadoopConf5 = mapReduceActionExecutor.createBaseHadoopConf(context, createUberJarActionXML4);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf5, context, createUberJarActionXML4, getFsTestCaseDir());
        assertEquals("", createBaseHadoopConf5.get("oozie.mapreduce.uber.jar"));
        assertNull(mapReduceActionExecutor.createLauncherConf(getFileSystem(), context, workflowActionBean, createUberJarActionXML4, createBaseHadoopConf5).getJar());
        Element createUberJarActionXML5 = createUberJarActionXML("job.jar", "<pipes></pipes>");
        JobConf createBaseHadoopConf6 = mapReduceActionExecutor.createBaseHadoopConf(context, createUberJarActionXML5);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf6, context, createUberJarActionXML5, getFsTestCaseDir());
        assertEquals("", createBaseHadoopConf6.get("oozie.mapreduce.uber.jar"));
        assertNull(mapReduceActionExecutor.createLauncherConf(getFileSystem(), context, workflowActionBean, createUberJarActionXML5, createBaseHadoopConf6).getJar());
        Element parseXml2 = XmlUtils.parseXml("<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node></map-reduce>");
        JobConf createBaseHadoopConf7 = mapReduceActionExecutor.createBaseHadoopConf(context, parseXml2);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf7, context, parseXml2, getFsTestCaseDir());
        assertNull(createBaseHadoopConf7.get("oozie.mapreduce.uber.jar"));
        assertNull(mapReduceActionExecutor.createLauncherConf(getFileSystem(), context, workflowActionBean, parseXml2, createBaseHadoopConf7).getJar());
        services.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
        try {
            Element createUberJarActionXML6 = createUberJarActionXML(getNameNodeUri() + "/app/job.jar", "");
            mapReduceActionExecutor.setupActionConf(mapReduceActionExecutor.createBaseHadoopConf(context, createUberJarActionXML6), context, createUberJarActionXML6, getFsTestCaseDir());
            fail("ActionExecutorException expected because uber jars are disabled");
        } catch (ActionExecutorException e) {
            assertEquals("MR003", e.getErrorCode());
            assertEquals(ActionExecutorException.ErrorType.ERROR, e.getErrorType());
            assertTrue(e.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
            assertTrue(e.getMessage().contains("oozie.mapreduce.uber.jar"));
        }
        services.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", z);
        Element parseXml3 = XmlUtils.parseXml("<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node><streaming><mapper>M</mapper><reducer>R</reducer><record-reader>RR</record-reader><record-reader-mapping>RRM1=1</record-reader-mapping><record-reader-mapping>RRM2=2</record-reader-mapping><env>e=E</env><env>ee=EE</env></streaming><configuration><property><name>mapred.input.dir</name><value>IN</value></property><property><name>mapred.output.dir</name><value>OUT</value></property></configuration></map-reduce>");
        JobConf createBaseHadoopConf8 = mapReduceActionExecutor.createBaseHadoopConf(context, parseXml3);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf8, context, parseXml3, getFsTestCaseDir());
        assertEquals("M", createBaseHadoopConf8.get("oozie.streaming.mapper"));
        assertEquals("R", createBaseHadoopConf8.get("oozie.streaming.reducer"));
        assertEquals("RR", createBaseHadoopConf8.get("oozie.streaming.record-reader"));
        assertEquals("2", createBaseHadoopConf8.get("oozie.streaming.record-reader-mapping.size"));
        assertEquals("2", createBaseHadoopConf8.get("oozie.streaming.env.size"));
        Element parseXml4 = XmlUtils.parseXml("<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node><pipes><map>M</map><reduce>R</reduce><inputformat>IF</inputformat><partitioner>P</partitioner><writer>W</writer><program>PP</program></pipes><configuration><property><name>mapred.input.dir</name><value>IN</value></property><property><name>mapred.output.dir</name><value>OUT</value></property></configuration></map-reduce>");
        JobConf createBaseHadoopConf9 = mapReduceActionExecutor.createBaseHadoopConf(context, parseXml4);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf9, context, parseXml4, getFsTestCaseDir());
        assertEquals("M", createBaseHadoopConf9.get("oozie.pipes.map"));
        assertEquals("R", createBaseHadoopConf9.get("oozie.pipes.reduce"));
        assertEquals("IF", createBaseHadoopConf9.get("oozie.pipes.inputformat"));
        assertEquals("P", createBaseHadoopConf9.get("oozie.pipes.partitioner"));
        assertEquals("W", createBaseHadoopConf9.get("oozie.pipes.writer"));
        assertEquals(getFsTestCaseDir() + "/PP", createBaseHadoopConf9.get("oozie.pipes.program"));
    }

    protected ActionExecutorTestCase.Context createContext(String str, String str2) throws Exception {
        JavaActionExecutor javaActionExecutor = new JavaActionExecutor();
        Path path = new Path("lib/test.jar");
        IOUtils.copyStream(new FileInputStream(IOUtils.createJar(new File(getTestCaseDir()), "test.jar", new Class[]{MapperReducerForTest.class})), getFileSystem().create(new Path(getAppPath(), "lib/test.jar")));
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("user.name", getTestUser());
        xConfiguration.setStrings("oozie.wf.application.lib", new String[]{path.toString()});
        WorkflowJobBean createBaseWorkflow = createBaseWorkflow(xConfiguration, "mr-action");
        WorkflowActionBean workflowActionBean = (WorkflowActionBean) createBaseWorkflow.getActions().get(0);
        workflowActionBean.setName(str);
        workflowActionBean.setType(javaActionExecutor.getType());
        workflowActionBean.setConf(str2);
        return new ActionExecutorTestCase.Context(createBaseWorkflow, workflowActionBean);
    }

    protected ActionExecutorTestCase.Context createContextWithCredentials(String str, String str2) throws Exception {
        JavaActionExecutor javaActionExecutor = new JavaActionExecutor();
        Path path = new Path("lib/test.jar");
        IOUtils.copyStream(new FileInputStream(IOUtils.createJar(new File(getTestCaseDir()), "test.jar", new Class[]{MapperReducerForTest.class})), getFileSystem().create(new Path(getAppPath(), "lib/test.jar")));
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("user.name", getTestUser());
        xConfiguration.setStrings("oozie.wf.application.lib", new String[]{path.toString()});
        WorkflowJobBean createBaseWorkflowWithCredentials = createBaseWorkflowWithCredentials(xConfiguration, "mr-action");
        WorkflowActionBean workflowActionBean = (WorkflowActionBean) createBaseWorkflowWithCredentials.getActions().get(0);
        workflowActionBean.setName(str);
        workflowActionBean.setType(javaActionExecutor.getType());
        workflowActionBean.setConf(str2);
        workflowActionBean.setCred(MapperReducerCredentialsForTest.TEST_CRED);
        return new ActionExecutorTestCase.Context(createBaseWorkflowWithCredentials, workflowActionBean);
    }

    protected RunningJob submitAction(ActionExecutorTestCase.Context context) throws Exception {
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        WorkflowAction action = context.getAction();
        mapReduceActionExecutor.prepareActionDir(getFileSystem(), context);
        mapReduceActionExecutor.submitLauncher(getFileSystem(), context, action);
        String externalId = action.getExternalId();
        String trackerUri = action.getTrackerUri();
        String consoleUrl = action.getConsoleUrl();
        assertNotNull(externalId);
        assertNotNull(trackerUri);
        assertNotNull(consoleUrl);
        Element parseXml = XmlUtils.parseXml(action.getConf());
        XConfiguration xConfiguration = new XConfiguration(new StringReader(XmlUtils.prettyPrint(parseXml.getChild("configuration")).toString()));
        xConfiguration.set("mapred.job.tracker", parseXml.getChildTextTrim("job-tracker"));
        xConfiguration.set("fs.default.name", parseXml.getChildTextTrim("name-node"));
        xConfiguration.set("user.name", context.getProtoActionConf().get("user.name"));
        xConfiguration.set("group.name", getTestGroup());
        xConfiguration.set("mapreduce.framework.name", "yarn");
        JobConf createJobConf = Services.get().get(HadoopAccessorService.class).createJobConf(trackerUri);
        XConfiguration.copy(xConfiguration, createJobConf);
        String str = createJobConf.get("user.name");
        createJobConf.get("group.name");
        RunningJob job = Services.get().get(HadoopAccessorService.class).createJobClient(str, createJobConf).getJob(JobID.forName(externalId));
        assertNotNull(job);
        return job;
    }

    private String _testSubmit(String str, String str2) throws Exception {
        ActionExecutorTestCase.Context createContext = createContext(str, str2);
        final RunningJob submitAction = submitAction(createContext);
        String externalId = createContext.getAction().getExternalId();
        waitFor(240000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.1
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return submitAction.isComplete();
            }
        });
        assertTrue(submitAction.isSuccessful());
        assertTrue(LauncherMapper.hasIdSwap(submitAction));
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertFalse(externalId.equals(createContext.getAction().getExternalId()));
        JobConf createBaseHadoopConf = mapReduceActionExecutor.createBaseHadoopConf(createContext, XmlUtils.parseXml(str2));
        String str3 = createBaseHadoopConf.get("user.name");
        createBaseHadoopConf.get("group.name");
        final RunningJob job = Services.get().get(HadoopAccessorService.class).createJobClient(str3, createBaseHadoopConf).getJob(JobID.forName(createContext.getAction().getExternalId()));
        waitFor(120000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.2
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return job.isComplete();
            }
        });
        assertTrue(job.isSuccessful());
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertEquals("SUCCEEDED", createContext.getAction().getExternalStatus());
        assertNull(createContext.getAction().getData());
        mapReduceActionExecutor.end(createContext, createContext.getAction());
        assertEquals(WorkflowAction.Status.OK, createContext.getAction().getStatus());
        assertNotNull(createContext.getVar("hadoop.counters"));
        assertTrue(createContext.getVar("hadoop.counters").contains("Counter"));
        assertNull(createContext.getExternalChildIDs());
        return job.getID().toString();
    }

    private void _testSubmitWithCredentials(String str, String str2) throws Exception {
        ActionExecutorTestCase.Context createContextWithCredentials = createContextWithCredentials("map-reduce", str2);
        final RunningJob submitAction = submitAction(createContextWithCredentials);
        String externalId = createContextWithCredentials.getAction().getExternalId();
        waitFor(120000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.3
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return submitAction.isComplete();
            }
        });
        assertTrue(submitAction.isSuccessful());
        assertTrue(LauncherMapper.hasIdSwap(submitAction));
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        mapReduceActionExecutor.check(createContextWithCredentials, createContextWithCredentials.getAction());
        assertFalse(externalId.equals(createContextWithCredentials.getAction().getExternalId()));
        JobConf createBaseHadoopConf = mapReduceActionExecutor.createBaseHadoopConf(createContextWithCredentials, XmlUtils.parseXml(str2));
        String str3 = createBaseHadoopConf.get("user.name");
        createBaseHadoopConf.get("group.name");
        final RunningJob job = Services.get().get(HadoopAccessorService.class).createJobClient(str3, createBaseHadoopConf).getJob(JobID.forName(createContextWithCredentials.getAction().getExternalId()));
        waitFor(120000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.4
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return job.isComplete();
            }
        });
        assertTrue(job.isSuccessful());
        mapReduceActionExecutor.check(createContextWithCredentials, createContextWithCredentials.getAction());
        assertEquals("SUCCEEDED", createContextWithCredentials.getAction().getExternalStatus());
        assertNull(createContextWithCredentials.getAction().getData());
        mapReduceActionExecutor.end(createContextWithCredentials, createContextWithCredentials.getAction());
        assertEquals(WorkflowAction.Status.OK, createContextWithCredentials.getAction().getStatus());
        assertTrue(MapperReducerCredentialsForTest.hasCredentials(job));
    }

    protected XConfiguration getMapReduceConfig(String str, String str2) {
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("mapred.mapper.class", MapperReducerForTest.class.getName());
        xConfiguration.set("mapred.reducer.class", MapperReducerForTest.class.getName());
        xConfiguration.set("mapred.input.dir", str);
        xConfiguration.set("mapred.output.dir", str2);
        return xConfiguration;
    }

    protected XConfiguration getMapReduceCredentialsConfig(String str, String str2) {
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("mapred.mapper.class", MapperReducerCredentialsForTest.class.getName());
        xConfiguration.set("mapred.reducer.class", MapperReducerForTest.class.getName());
        xConfiguration.set("mapred.input.dir", str);
        xConfiguration.set("mapred.output.dir", str2);
        return xConfiguration;
    }

    protected XConfiguration getMapReduceUberJarConfig(String str, String str2) throws Exception {
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("mapred.mapper.class", MapperReducerUberJarForTest.class.getName());
        xConfiguration.set("mapred.reducer.class", MapperReducerUberJarForTest.class.getName());
        xConfiguration.set("mapred.input.dir", str);
        xConfiguration.set("mapred.output.dir", str2);
        xConfiguration.set("oozie.mapreduce.uber.jar", createAndUploadUberJar().toUri().toString());
        return xConfiguration;
    }

    public void testMapReduce() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "input");
        Path path2 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        _testSubmit("map-reduce", "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceConfig(path.toString(), path2.toString()).toXmlString(false) + "</map-reduce>");
    }

    public void testMapReduceWithCredentials() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "input");
        Path path2 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        _testSubmitWithCredentials("map-reduce", "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceCredentialsConfig(path.toString(), path2.toString()).toXmlString(false) + "</map-reduce>");
    }

    protected Path createAndUploadUberJar() throws Exception {
        Path makeUberJarWithLib = makeUberJarWithLib(getTestCaseDir());
        Path path = new Path(getAppPath(), makeUberJarWithLib.getName());
        getFileSystem().moveFromLocalFile(makeUberJarWithLib, path);
        File file = new File(makeUberJarWithLib.toUri().toString());
        if (file.exists()) {
            file.delete();
        }
        return path;
    }

    private Path makeUberJarWithLib(String str) throws Exception {
        Path path = new Path(str, "uber.jar");
        JarOutputStream jarOutputStream = new JarOutputStream(new FileOutputStream(new File(path.toUri().getPath())));
        createAndAddJarToJar(jarOutputStream, new File(new Path(str, "lib1.jar").toUri().getPath()));
        createAndAddJarToJar(jarOutputStream, new File(new Path(str, "lib2.jar").toUri().getPath()));
        jarOutputStream.close();
        return path;
    }

    private void createAndAddJarToJar(JarOutputStream jarOutputStream, File file) throws Exception {
        int read;
        JarOutputStream jarOutputStream2 = new JarOutputStream(new FileOutputStream(file));
        jarOutputStream2.putNextEntry(new ZipEntry(file.getName() + ".inside"));
        jarOutputStream2.closeEntry();
        jarOutputStream2.close();
        jarOutputStream.putNextEntry(new ZipEntry("lib/" + file.getName()));
        FileInputStream fileInputStream = new FileInputStream(file);
        byte[] bArr = new byte[1024];
        do {
            read = fileInputStream.read(bArr);
            if (read >= 0) {
                jarOutputStream.write(bArr, 0, read);
            }
        } while (read != -1);
        fileInputStream.close();
        jarOutputStream.closeEntry();
        file.delete();
    }

    public void _testMapReduceWithUberJar() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "input");
        Path path2 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        String _testSubmit = _testSubmit("map-reduce", "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceUberJarConfig(path.toString(), path2.toString()).toXmlString(false) + "</map-reduce>");
        boolean z = false;
        String str = "jobcache/" + _testSubmit + "/jars/lib/lib1.jar";
        Pattern compile = Pattern.compile(".*appcache/application_" + _testSubmit.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib1.jar");
        boolean z2 = false;
        String str2 = "jobcache/" + _testSubmit + "/jars/lib/lib1.jar";
        Pattern compile2 = Pattern.compile(".*appcache/application_" + _testSubmit.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib2.jar");
        for (FileStatus fileStatus : getFileSystem().listStatus(path2)) {
            Path path3 = fileStatus.getPath();
            if (getFileSystem().isFile(path3) && path3.getName().startsWith("part-")) {
                FSDataInputStream open = getFileSystem().open(path3);
                Scanner scanner = new Scanner((InputStream) open);
                while (scanner.hasNextLine()) {
                    String nextLine = scanner.nextLine();
                    z = z || nextLine.endsWith(str) || compile.matcher(nextLine).matches();
                    z2 = z2 || nextLine.endsWith(str2) || compile2.matcher(nextLine).matches();
                }
                scanner.close();
                open.close();
            }
        }
        assertTrue("lib/lib1.jar should have been unzipped from the uber jar and added to the classpath but was not", z);
        assertTrue("lib/lib2.jar should have been unzipped from the uber jar and added to the classpath but was not", z2);
    }

    public void testMapReduceWithUberJarDisabled() throws Exception {
        Services services = Services.get();
        boolean z = services.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
        try {
            try {
                services.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", false);
                _testMapReduceWithUberJar();
                services.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", z);
            } catch (ActionExecutorException e) {
                assertEquals("MR003", e.getErrorCode());
                assertEquals(ActionExecutorException.ErrorType.ERROR, e.getErrorType());
                assertTrue(e.getMessage().contains("oozie.action.mapreduce.uber.jar.enable"));
                assertTrue(e.getMessage().contains("oozie.mapreduce.uber.jar"));
                services.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", z);
            } catch (Exception e2) {
                throw e2;
            }
        } catch (Throwable th) {
            services.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", z);
            throw th;
        }
    }

    protected XConfiguration getStreamingConfig(String str, String str2) {
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("mapred.input.dir", str);
        xConfiguration.set("mapred.output.dir", str2);
        return xConfiguration;
    }

    public void testStreaming() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "jar/hadoop-streaming.jar");
        IOUtils.copyStream(new FileInputStream(ClassUtils.findContainingJar(StreamJob.class)), fileSystem.create(new Path(getAppPath(), path)));
        Path path2 = new Path(getFsTestCaseDir(), "input");
        Path path3 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path2, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        _testSubmit("streaming", "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>      <streaming>        <mapper>cat</mapper>        <reducer>wc</reducer>      </streaming>" + getStreamingConfig(path2.toString(), path3.toString()).toXmlString(false) + "<file>" + path + "</file></map-reduce>");
    }

    protected XConfiguration getPipesConfig(String str, String str2) {
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.setBoolean("hadoop.pipes.java.recordreader", true);
        xConfiguration.setBoolean("hadoop.pipes.java.recordwriter", true);
        xConfiguration.set("mapred.input.dir", str);
        xConfiguration.set("mapred.output.dir", str2);
        return xConfiguration;
    }

    private XConfiguration getOozieActionExternalStatsWriteProperty(String str, String str2, String str3) {
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("mapred.input.dir", str);
        xConfiguration.set("mapred.output.dir", str2);
        xConfiguration.set("oozie.action.external.stats.write", str3);
        return xConfiguration;
    }

    public void testPipes() throws Exception {
        Path path = new Path(getFsTestCaseDir(), "wordcount-simple");
        FileSystem fileSystem = getFileSystem();
        InputStream resourceAsStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("wordcount-simple");
        if (resourceAsStream == null) {
            System.out.println("SKIPPING TEST: TestMapReduceActionExecutor.testPipes(), binary 'wordcount-simple' not available in the classpath");
            return;
        }
        IOUtils.copyStream(resourceAsStream, fileSystem.create(path));
        Path path2 = new Path(getFsTestCaseDir(), "input");
        Path path3 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path2, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        _testSubmit("pipes", "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>      <pipes>        <program>" + path + "#wordcount-simple</program>      </pipes>" + getPipesConfig(path2.toString(), path3.toString()).toXmlString(false) + "<file>" + path + "</file></map-reduce>");
    }

    public void testSetExecutionStats_when_user_has_specified_stats_write_TRUE() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "input");
        Path path2 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        String str = "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>" + getOozieActionExternalStatsWriteProperty(path.toString(), path2.toString(), "true").toXmlString(false) + "</map-reduce>";
        ActionExecutorTestCase.Context createContext = createContext("map-reduce", str);
        final RunningJob submitAction = submitAction(createContext);
        String externalId = createContext.getAction().getExternalId();
        waitFor(120000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.5
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return submitAction.isComplete();
            }
        });
        assertTrue(submitAction.isSuccessful());
        assertTrue(LauncherMapper.hasIdSwap(submitAction));
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertFalse(externalId.equals(createContext.getAction().getExternalId()));
        JobConf createBaseHadoopConf = mapReduceActionExecutor.createBaseHadoopConf(createContext, XmlUtils.parseXml(str));
        String str2 = createBaseHadoopConf.get("user.name");
        createBaseHadoopConf.get("group.name");
        final RunningJob job = Services.get().get(HadoopAccessorService.class).createJobClient(str2, createBaseHadoopConf).getJob(JobID.forName(createContext.getAction().getExternalId()));
        waitFor(120000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.6
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return job.isComplete();
            }
        });
        assertTrue(job.isSuccessful());
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertEquals("SUCCEEDED", createContext.getAction().getExternalStatus());
        assertNull(createContext.getAction().getData());
        mapReduceActionExecutor.end(createContext, createContext.getAction());
        assertEquals(WorkflowAction.Status.OK, createContext.getAction().getStatus());
        assertNotNull(createContext.getExecutionStats());
        assertTrue(createContext.getExecutionStats().contains("ACTION_TYPE"));
        assertTrue(createContext.getExecutionStats().contains("Counter"));
        assertNull(createContext.getExternalChildIDs());
        assertNotNull(createContext.getVar("hadoop.counters"));
        assertTrue(createContext.getVar("hadoop.counters").contains("Counter"));
    }

    public void testSetExecutionStats_when_user_has_specified_stats_write_FALSE() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "input");
        Path path2 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        String str = "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>" + getOozieActionExternalStatsWriteProperty(path.toString(), path2.toString(), "false").toXmlString(false) + "</map-reduce>";
        ActionExecutorTestCase.Context createContext = createContext("map-reduce", str);
        final RunningJob submitAction = submitAction(createContext);
        String externalId = createContext.getAction().getExternalId();
        waitFor(240000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.7
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return submitAction.isComplete();
            }
        });
        assertTrue(submitAction.isSuccessful());
        assertTrue(LauncherMapper.hasIdSwap(submitAction));
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertFalse(externalId.equals(createContext.getAction().getExternalId()));
        JobConf createBaseHadoopConf = mapReduceActionExecutor.createBaseHadoopConf(createContext, XmlUtils.parseXml(str));
        String str2 = createBaseHadoopConf.get("user.name");
        createBaseHadoopConf.get("group.name");
        final RunningJob job = Services.get().get(HadoopAccessorService.class).createJobClient(str2, createBaseHadoopConf).getJob(JobID.forName(createContext.getAction().getExternalId()));
        waitFor(120000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.8
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return job.isComplete();
            }
        });
        assertTrue(job.isSuccessful());
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertEquals("SUCCEEDED", createContext.getAction().getExternalStatus());
        assertNull(createContext.getAction().getData());
        mapReduceActionExecutor.end(createContext, createContext.getAction());
        assertEquals(WorkflowAction.Status.OK, createContext.getAction().getStatus());
        assertNull(createContext.getExecutionStats());
        assertNull(createContext.getExternalChildIDs());
        assertNotNull(createContext.getVar("hadoop.counters"));
        assertTrue(createContext.getVar("hadoop.counters").contains("Counter"));
    }

    public void testSetMapredJobName() throws Exception {
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "input");
        Path path2 = new Path(getFsTestCaseDir(), "output");
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path(path, "data.txt")));
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.write("dummy\n");
        outputStreamWriter.close();
        XConfiguration mapReduceConfig = getMapReduceConfig(path.toString(), path2.toString());
        mapReduceConfig.set("oozie.launcher.mapred.job.name", "MapReduceLauncherTest");
        mapReduceConfig.set("mapred.job.name", "MapReduceTest");
        String str = "<map-reduce><job-tracker>" + getJobTrackerUri() + "</job-tracker><name-node>" + getNameNodeUri() + "</name-node>" + mapReduceConfig.toXmlString(false) + "</map-reduce>";
        ActionExecutorTestCase.Context createContext = createContext("map-reduce", str);
        final RunningJob submitAction = submitAction(createContext);
        String externalId = createContext.getAction().getExternalId();
        waitFor(240000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.9
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return submitAction.isComplete();
            }
        });
        assertTrue(submitAction.isSuccessful());
        assertTrue(LauncherMapper.hasIdSwap(submitAction));
        System.out.println("Launcher job name: " + submitAction.getJobName());
        assertTrue(submitAction.getJobName().equals("MapReduceLauncherTest"));
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertFalse(externalId.equals(createContext.getAction().getExternalId()));
        JobConf createBaseHadoopConf = mapReduceActionExecutor.createBaseHadoopConf(createContext, XmlUtils.parseXml(str));
        final RunningJob job = Services.get().get(HadoopAccessorService.class).createJobClient(createBaseHadoopConf.get("user.name"), createBaseHadoopConf).getJob(JobID.forName(createContext.getAction().getExternalId()));
        waitFor(120000, new XTestCase.Predicate() { // from class: org.apache.oozie.action.hadoop.TestMapReduceActionExecutor.10
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return job.isComplete();
            }
        });
        assertTrue(job.isSuccessful());
        mapReduceActionExecutor.check(createContext, createContext.getAction());
        assertEquals("SUCCEEDED", createContext.getAction().getExternalStatus());
        assertNull(createContext.getAction().getData());
        mapReduceActionExecutor.end(createContext, createContext.getAction());
        assertEquals(WorkflowAction.Status.OK, createContext.getAction().getStatus());
        System.out.println("Mapred job name: " + job.getJobName());
        assertTrue(job.getJobName().equals("MapReduceTest"));
        assertNull(createContext.getExecutionStats());
        assertNull(createContext.getExternalChildIDs());
        assertNotNull(createContext.getVar("hadoop.counters"));
        assertTrue(createContext.getVar("hadoop.counters").contains("Counter"));
    }

    public void testDefaultShareLibName() {
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        Element element = new Element("mapreduce");
        assertNull(mapReduceActionExecutor.getDefaultShareLibName(element));
        element.addContent(new Element("streaming"));
        assertEquals("mapreduce-streaming", mapReduceActionExecutor.getDefaultShareLibName(element));
    }

    public void testCommaSeparatedFilesAndArchives() throws Exception {
        Path path = new Path(getFsTestCaseDir(), "root");
        Path path2 = new Path("jar.jar");
        getFileSystem().create(new Path(getAppPath(), path2)).close();
        Path path3 = new Path(path, "rootJar.jar");
        getFileSystem().create(path3).close();
        Path path4 = new Path("file");
        getFileSystem().create(new Path(getAppPath(), path4)).close();
        Path path5 = new Path(path, "rootFile");
        getFileSystem().create(path5).close();
        Path path6 = new Path("soFile.so");
        getFileSystem().create(new Path(getAppPath(), path6)).close();
        Path path7 = new Path(path, "rootSoFile.so");
        getFileSystem().create(path7).close();
        Path path8 = new Path("soFile.so.1");
        getFileSystem().create(new Path(getAppPath(), path8)).close();
        Path path9 = new Path(path, "rootSoFile.so.1");
        getFileSystem().create(path9).close();
        Path path10 = new Path("archive.tar");
        getFileSystem().create(new Path(getAppPath(), path10)).close();
        Path path11 = new Path(path, "rootArchive.tar");
        getFileSystem().create(path11).close();
        String str = "<map-reduce>      <job-tracker>" + getJobTrackerUri() + "</job-tracker>      <name-node>" + getNameNodeUri() + "</name-node>      <main-class>CLASS</main-class>      <file>" + path2.toString() + "," + path3.toString() + "," + path4.toString() + ", " + path5.toString() + "  ," + path6.toString() + "," + path7.toString() + "," + path8.toString() + "," + path9.toString() + "</file>\n      <archive>" + path10.toString() + ", " + path11.toString() + " </archive>\n</map-reduce>";
        Element parseXml = XmlUtils.parseXml(str);
        ActionExecutorTestCase.Context createContext = createContext("map-reduce", str);
        Path appPath = getAppPath();
        MapReduceActionExecutor mapReduceActionExecutor = new MapReduceActionExecutor();
        JobConf createBaseHadoopConf = mapReduceActionExecutor.createBaseHadoopConf(createContext, parseXml);
        mapReduceActionExecutor.setupActionConf(createBaseHadoopConf, createContext, parseXml, appPath);
        mapReduceActionExecutor.setLibFilesArchives(createContext, parseXml, appPath, createBaseHadoopConf);
        assertTrue(DistributedCache.getSymlink(createBaseHadoopConf));
        Path[] fileClassPaths = DistributedCache.getFileClassPaths(createBaseHadoopConf);
        for (Path path12 : new Path[]{new Path(getAppPath(), path2), path3}) {
            boolean z = false;
            for (Path path13 : fileClassPaths) {
                if (!z && path12.toUri().getPath().equals(path13.toUri().getPath())) {
                    z = true;
                }
            }
            assertTrue("file " + path12.toUri().getPath() + " not found in classpath", z);
        }
        for (Path path14 : new Path[]{new Path(getAppPath(), path4), path5, new Path(getAppPath(), path6), path7, new Path(getAppPath(), path8), path9}) {
            boolean z2 = false;
            for (Path path15 : fileClassPaths) {
                if (!z2 && path14.toUri().getPath().equals(path15.toUri().getPath())) {
                    z2 = true;
                }
            }
            assertFalse("file " + path14.toUri().getPath() + " found in classpath", z2);
        }
        URI[] cacheFiles = DistributedCache.getCacheFiles(createBaseHadoopConf);
        for (Path path16 : new Path[]{new Path(getAppPath(), path2), path3, new Path(getAppPath(), path4), path5, new Path(getAppPath(), path6), path7, new Path(getAppPath(), path8), path9}) {
            boolean z3 = false;
            for (URI uri : cacheFiles) {
                if (!z3 && path16.toUri().getPath().equals(uri.getPath())) {
                    z3 = true;
                }
            }
            assertTrue("file " + path16.toUri().getPath() + " not found in cache", z3);
        }
        URI[] cacheArchives = DistributedCache.getCacheArchives(createBaseHadoopConf);
        for (Path path17 : new Path[]{new Path(getAppPath(), path10), path11}) {
            boolean z4 = false;
            for (URI uri2 : cacheArchives) {
                if (!z4 && path17.toUri().getPath().equals(uri2.getPath())) {
                    z4 = true;
                }
            }
            assertTrue("archive " + path17.toUri().getPath() + " not found in cache", z4);
        }
    }
}
