package org.apache.oozie.command.wf;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Reader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.apache.oozie.DagEngine;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.ExtendedCallableQueueService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;

/* loaded from: input_file:org/apache/oozie/command/wf/TestSignalXCommand.class */
public class TestSignalXCommand extends XDataTestCase {
    private Services services;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.oozie.test.XHCatTestCase, org.apache.oozie.test.XFsTestCase, org.apache.oozie.test.XTestCase
    public void setUp() throws Exception {
        super.setUp();
        this.services = new Services();
        this.services.init();
        ConfigurationService.setBoolean("oozie.validate.ForkJoin", false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.oozie.test.XHCatTestCase, org.apache.oozie.test.XFsTestCase, org.apache.oozie.test.XTestCase
    public void tearDown() throws Exception {
        this.services.destroy();
        super.tearDown();
    }

    public void testJoinFail() throws Exception {
        ConfigurationService.setBoolean("oozie.workflow.parallel.fork.action.start", true);
        _testJoinFail();
        ConfigurationService.setBoolean("oozie.workflow.parallel.fork.action.start", false);
        _testJoinFail();
    }

    public void testSuspendPoints() throws Exception {
        ConfigurationService.setBoolean("oozie.workflow.parallel.fork.action.start", true);
        _testSuspendPoints();
        this.services.destroy();
        this.services = new Services();
        this.services.init();
        ConfigurationService.setBoolean("oozie.workflow.parallel.fork.action.start", false);
        _testSuspendPoints();
    }

    public void testSuspendPointsAll() throws Exception {
        ConfigurationService.setBoolean("oozie.workflow.parallel.fork.action.start", true);
        _testSuspendPointsAll();
        this.services.destroy();
        this.services = new Services();
        this.services.init();
        ConfigurationService.setBoolean("oozie.workflow.parallel.fork.action.start", false);
        _testSuspendPointsAll();
    }

    public void _testJoinFail() throws Exception {
        Logger logger = Logger.getLogger(SignalXCommand.class);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        logger.addAppender(new WriterAppender(new SimpleLayout(), byteArrayOutputStream));
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "app");
        fileSystem.mkdirs(path);
        Reader resourceAsReader = IOUtils.getResourceAsReader("wf-fork.xml", -1);
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter((OutputStream) fileSystem.create(new Path(path, "workflow.xml")), StandardCharsets.UTF_8);
        IOUtils.copyCharStream(resourceAsReader, outputStreamWriter);
        outputStreamWriter.close();
        resourceAsReader.close();
        DagEngine dagEngine = new DagEngine("u");
        XConfiguration xConfiguration = new XConfiguration();
        xConfiguration.set("oozie.wf.application.path", new Path(path, "workflow.xml").toString());
        xConfiguration.set("user.name", getTestUser());
        String submitJob = dagEngine.submitJob(xConfiguration, false);
        assertNotNull(submitJob);
        dagEngine.start(submitJob);
        Thread.sleep(2000L);
        assertFalse(byteArrayOutputStream.toString(StandardCharsets.UTF_8.name()).contains("EntityExistsException"));
    }

    public void _testSuspendPoints() throws Exception {
        this.services.destroy();
        LocalOozie.start();
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "app");
        fileSystem.mkdirs(path);
        Reader resourceAsReader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter((OutputStream) fileSystem.create(new Path(path, "workflow.xml")), StandardCharsets.UTF_8);
        IOUtils.copyCharStream(resourceAsReader, outputStreamWriter);
        outputStreamWriter.close();
        resourceAsReader.close();
        OozieClient client = LocalOozie.getClient();
        Properties createConfiguration = client.createConfiguration();
        createConfiguration.setProperty("oozie.wf.application.path", new Path(path, "workflow.xml").toString());
        createConfiguration.setProperty("user.name", getTestUser());
        createConfiguration.setProperty("oozie.suspend.on.nodes", "action1,nonexistant_action_name,decision1, action3,join1 ,fork1,action4b");
        String submit = client.submit(createConfiguration);
        assertNotNull(submit);
        WorkflowJob jobInfo = client.getJobInfo(submit);
        assertEquals(WorkflowJob.Status.PREP, jobInfo.getStatus());
        long currentTimeMillis = System.currentTimeMillis();
        client.start(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"action1"}, new String[]{":start:"});
        long currentTimeMillis2 = System.currentTimeMillis();
        for (WorkflowActionBean workflowActionBean : client.getJobInfo(submit).getActions()) {
            assertNotNull(workflowActionBean.getCreatedTime());
            assertTrue(workflowActionBean.getCreatedTime().getTime() > currentTimeMillis && workflowActionBean.getCreatedTime().getTime() < currentTimeMillis2);
        }
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"decision1"}, new String[]{":start:", "action1", "action2"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"action3"}, new String[]{":start:", "action1", "action2", "decision1"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"fork1"}, new String[]{":start:", "action1", "action2", "decision1", "action3"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"action4a", "action4b", "action4c"}, new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"join1"}, new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUCCEEDED, new String[0], new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c", "join1", "end"});
        LocalOozie.stop();
    }

    public void _testSuspendPointsAll() throws Exception {
        this.services.destroy();
        LocalOozie.start();
        FileSystem fileSystem = getFileSystem();
        Path path = new Path(getFsTestCaseDir(), "app");
        fileSystem.mkdirs(path);
        Reader resourceAsReader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter((OutputStream) fileSystem.create(new Path(path, "workflow.xml")), StandardCharsets.UTF_8);
        IOUtils.copyCharStream(resourceAsReader, outputStreamWriter);
        outputStreamWriter.close();
        resourceAsReader.close();
        OozieClient client = LocalOozie.getClient();
        Properties createConfiguration = client.createConfiguration();
        createConfiguration.setProperty("oozie.wf.application.path", new Path(path, "workflow.xml").toString());
        createConfiguration.setProperty("user.name", getTestUser());
        createConfiguration.setProperty("oozie.suspend.on.nodes", "*");
        String submit = client.submit(createConfiguration);
        assertNotNull(submit);
        WorkflowJob jobInfo = client.getJobInfo(submit);
        assertEquals(WorkflowJob.Status.PREP, jobInfo.getStatus());
        client.start(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"action1"}, new String[]{":start:"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"action2"}, new String[]{":start:", "action1"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"decision1"}, new String[]{":start:", "action1", "action2"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"action3"}, new String[]{":start:", "action1", "action2", "decision1"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"fork1"}, new String[]{":start:", "action1", "action2", "decision1", "action3"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"action4a", "action4b", "action4c"}, new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"join1"}, new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUSPENDED, new String[]{"end"}, new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c", "join1"});
        client.resume(submit);
        checkSuspendActions(jobInfo, client, submit, WorkflowJob.Status.SUCCEEDED, new String[0], new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c", "join1", "end"});
        LocalOozie.stop();
    }

    private void checkSuspendActions(WorkflowJob workflowJob, final OozieClient oozieClient, final String str, final WorkflowJob.Status status, String[] strArr, String[] strArr2) throws Exception {
        waitFor(30000, new XTestCase.Predicate() { // from class: org.apache.oozie.command.wf.TestSignalXCommand.1
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return oozieClient.getJobInfo(str).getStatus() == status;
            }
        });
        WorkflowJob jobInfo = oozieClient.getJobInfo(str);
        assertEquals(status, jobInfo.getStatus());
        int i = 0;
        int i2 = 0;
        for (WorkflowAction workflowAction : jobInfo.getActions()) {
            boolean z = false;
            for (String str2 : strArr) {
                if (!z && str2.equals(workflowAction.getName())) {
                    assertEquals("action [" + workflowAction.getName() + "] had incorrect status", WorkflowAction.Status.PREP, workflowAction.getStatus());
                    i++;
                    z = true;
                }
            }
            if (!z) {
                for (String str3 : strArr2) {
                    if (!z && str3.equals(workflowAction.getName())) {
                        assertEquals("action [" + workflowAction.getName() + "] had incorrect status", WorkflowAction.Status.OK, workflowAction.getStatus());
                        i2++;
                        z = true;
                    }
                }
            }
            if (!z) {
                fail("Unexpected action [" + workflowAction.getName() + "] with status [" + workflowAction.getStatus() + "]");
            }
        }
        assertEquals(strArr.length, i);
        assertEquals(strArr2.length, i2);
    }

    private void writeToFile(String str, String str2) throws IOException {
        PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(new FileOutputStream(new File(URI.create(str2))), StandardCharsets.UTF_8));
        try {
            printWriter.println(str);
            printWriter.close();
        } catch (Throwable th) {
            try {
                printWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public void testPossibleDeadLock() throws Exception {
        setSystemProperty("oozie.services.ext", ExtendedCallableQueueService.class.getName());
        this.services = new Services();
        this.services.getConf().setInt("oozie.service.CallableQueueService.threads", 1);
        this.services.init();
        ConfigurationService.setBoolean("oozie.workflow.parallel.fork.action.start", true);
        XConfiguration xConfiguration = new XConfiguration();
        String testCaseFileUri = getTestCaseFileUri("workflow.xml");
        writeToFile("<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"wf-fork\"><start to=\"fork1\"/><fork name=\"fork1\"><path start=\"action1\"/><path start=\"action2\"/><path start=\"action3\"/><path start=\"action4\"/><path start=\"action5\"/></fork><action name=\"action1\"><fs></fs><ok to=\"join1\"/><error to=\"kill\"/></action><action name=\"action2\"><fs></fs><ok to=\"join1\"/><error to=\"kill\"/></action><action name=\"action3\"><fs></fs><ok to=\"join1\"/><error to=\"kill\"/></action><action name=\"action4\"><fs></fs><ok to=\"join1\"/><error to=\"kill\"/></action><action name=\"action5\"><fs></fs><ok to=\"join1\"/><error to=\"kill\"/></action><join name=\"join1\" to=\"end\"/><kill name=\"kill\"><message>killed</message></kill><end name=\"end\"/></workflow-app>", testCaseFileUri);
        xConfiguration.set("oozie.wf.application.path", testCaseFileUri);
        xConfiguration.set("user.name", getTestUser());
        final String str = (String) new SubmitXCommand(xConfiguration).call();
        new StartXCommand(str).call();
        waitFor(20000, new XTestCase.Predicate() { // from class: org.apache.oozie.command.wf.TestSignalXCommand.2
            @Override // org.apache.oozie.test.XTestCase.Predicate
            public boolean evaluate() throws Exception {
                return ((WorkflowJobBean) WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, new Object[]{str})).getStatus() == WorkflowJob.Status.SUCCEEDED;
            }
        });
        assertEquals(((WorkflowJobBean) WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, new Object[]{str})).getStatus(), WorkflowJob.Status.SUCCEEDED);
    }
}
