package org.apache.tez.tests;

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.service.impl.ContainerRunnerImpl;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/tests/TestExternalTezServices.class */
public class TestExternalTezServices {
    private static ExternalTezServiceTestHelper extServiceTestHelper;
    private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServices.class);
    private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServices.class.getSimpleName());
    private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
    private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
    private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_REGULAR_CONTAINERS = Vertex.VertexExecutionContext.createExecuteInContainers(true);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_IN_AM = Vertex.VertexExecutionContext.createExecuteInAm(true);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
    private static String TEST_ROOT_DIR = "target/" + TestExternalTezServices.class.getName() + "-tmpDir";

    @BeforeClass
    public static void setup() throws Exception {
        extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
        extServiceTestHelper.setupSharedTezClient(ServicePluginsDescriptor.create(true, true, new TaskSchedulerDescriptor[]{(TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()).setUserPayload(createUserPayloadFromConf)}, new ContainerLauncherDescriptor[]{(ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()).setUserPayload(createUserPayloadFromConf)}, new TaskCommunicatorDescriptor[]{(TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()).setUserPayload(createUserPayloadFromConf)}));
        extServiceTestHelper.setupHashJoinData(SRC_DATA_DIR, new Path(SRC_DATA_DIR, "inPath1"), new Path(SRC_DATA_DIR, "inPath2"), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH);
    }

    @AfterClass
    public static void tearDown() throws IOException, TezException {
        extServiceTestHelper.tearDownAll();
    }

    @Test(timeout = 60000)
    public void testAllInService() throws Exception {
        runJoinValidate("AllInService", 7, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
    }

    @Test(timeout = 60000)
    public void testAllInContainers() throws Exception {
        runJoinValidate("AllInContainers", 0, EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
    }

    @Test(timeout = 60000)
    public void testAllInAM() throws Exception {
        runJoinValidate("AllInAM", 0, EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM);
    }

    @Test(timeout = 60000)
    public void testMixed1() throws Exception {
        runJoinValidate("Mixed1", 4, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
    }

    @Test(timeout = 60000)
    public void testMixed2() throws Exception {
        runJoinValidate("Mixed2", 3, EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
    }

    @Test(timeout = 60000)
    public void testMixed3() throws Exception {
        runJoinValidate("Mixed3", 4, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
    }

    @Test(timeout = 60000)
    public void testMixed4() throws Exception {
        runJoinValidate("Mixed4", 0, EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_IN_AM);
    }

    @Test(timeout = 60000)
    public void testMixed5() throws Exception {
        runJoinValidate("Mixed5", 2, EXECUTION_CONTEXT_REGULAR_CONTAINERS, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_IN_AM);
    }

    @Test(timeout = 60000)
    public void testMixed6() throws Exception {
        runJoinValidate("Mixed6", 3, EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
    }

    @Test(timeout = 60000)
    public void testMixed7() throws Exception {
        runJoinValidate("Mixed7", 0, EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_IN_AM, EXECUTION_CONTEXT_REGULAR_CONTAINERS);
    }

    @Test(timeout = 60000)
    public void testErrorPropagation() throws TezException, InterruptedException, IOException {
        runExceptionSimulation();
    }

    private void runExceptionSimulation() throws IOException, TezException, InterruptedException {
        DAG create = DAG.create(ContainerRunnerImpl.DAG_NAME_INSTRUMENTED_FAILURES);
        Vertex create2 = Vertex.create("Vertex1", ProcessorDescriptor.create(SleepProcessor.class.getName()), 3);
        create2.setExecutionContext(EXECUTION_CONTEXT_EXT_SERVICE_PUSH);
        create.addVertex(create2);
        Assert.assertEquals(DAGStatus.State.SUCCEEDED, extServiceTestHelper.getSharedTezClient().submitDAG(create).waitForCompletion().getState());
        Assert.assertEquals(1L, r0.getDAGProgress().getFailedTaskAttemptCount());
        Assert.assertEquals(1L, r0.getDAGProgress().getKilledTaskAttemptCount());
    }

    private void runJoinValidate(String str, int i, Vertex.VertexExecutionContext vertexExecutionContext, Vertex.VertexExecutionContext vertexExecutionContext2, Vertex.VertexExecutionContext vertexExecutionContext3) throws Exception {
        int numSubmissions = extServiceTestHelper.getTezTestServiceCluster().getNumSubmissions();
        Assert.assertEquals(0L, new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, vertexExecutionContext, vertexExecutionContext2, vertexExecutionContext3, str).run(new TezConfiguration(extServiceTestHelper.getConfForJobs()), new String[]{"-disableSplitGrouping", HASH_JOIN_EXPECTED_RESULT_PATH.toString(), HASH_JOIN_OUTPUT_PATH.toString(), "3"}, extServiceTestHelper.getSharedTezClient()));
        Assert.assertEquals(i, extServiceTestHelper.getTezTestServiceCluster().getNumSubmissions() - numSubmissions);
    }
}
