package org.apache.tez.tests;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.app.ErrorPluginConfiguration;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerServiceWithErrors;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/tests/TestExternalTezServicesErrors.class */
public class TestExternalTezServicesErrors {
    private static final String SUFFIX_LAUNCHER = "ContainerLauncher";
    private static final String SUFFIX_TASKCOMM = "TaskCommunicator";
    private static final String SUFFIX_SCHEDULER = "TaskScheduler";
    private static ExternalTezServiceTestHelper extServiceTestHelper;
    private static ServicePluginsDescriptor servicePluginsDescriptor;
    private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class);
    private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServicesErrors.class.getSimpleName());
    private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
    private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
    private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final String EXT_THROW_ERROR_ENTITY_NAME = "ExtServiceTestThrowErrors";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_THROW = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_THROW = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_THROW_ERROR_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_THROW = Vertex.VertexExecutionContext.create(EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final String EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportNonFatalErrors";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL = Vertex.VertexExecutionContext.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final String EXT_REPORT_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportFatalErrors";
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL = Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_REPORT_FATAL_ERROR_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL = Vertex.VertexExecutionContext.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
    private static String TEST_ROOT_DIR = "target/" + TestExternalTezServicesErrors.class.getName() + "-tmpDir";

    @BeforeClass
    public static void setup() throws Exception {
        extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
        UserPayload userPayload = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createThrowErrorConf());
        UserPayload userPayload2 = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_LAUNCHER));
        UserPayload userPayload3 = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_TASKCOMM));
        servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true, new TaskSchedulerDescriptor[]{(TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName()).setUserPayload(createUserPayloadFromConf), (TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME, TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(userPayload), (TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_SCHEDULER))), (TaskSchedulerDescriptor) TaskSchedulerDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_SCHEDULER)))}, new ContainerLauncherDescriptor[]{(ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName()).setUserPayload(createUserPayloadFromConf), (ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME, TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayload), (ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayload2), (ContainerLauncherDescriptor) ContainerLauncherDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_LAUNCHER)))}, new TaskCommunicatorDescriptor[]{(TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName()).setUserPayload(createUserPayloadFromConf), (TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME, TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayload), (TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayload3), (TaskCommunicatorDescriptor) TaskCommunicatorDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_TASKCOMM)))});
        extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor);
        extServiceTestHelper.setupHashJoinData(SRC_DATA_DIR, new Path(SRC_DATA_DIR, "inPath1"), new Path(SRC_DATA_DIR, "inPath2"), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH);
        extServiceTestHelper.shutdownSharedTezClient();
    }

    @AfterClass
    public static void tearDown() throws IOException, TezException {
        extServiceTestHelper.tearDownAll();
    }

    @Test(timeout = 90000)
    public void testContainerLauncherThrowError() throws Exception {
        testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_THROW, SUFFIX_LAUNCHER, Lists.newArrayList(new String[]{"Service Error", DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR.name()}));
    }

    @Test(timeout = 90000)
    public void testTaskCommunicatorThrowError() throws Exception {
        testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_TASKCOMM_THROW, SUFFIX_TASKCOMM, Lists.newArrayList(new String[]{"Service Error", DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR.name()}));
    }

    @Test(timeout = 90000)
    public void testTaskSchedulerThrowError() throws Exception {
        testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_SCHEDULER_THROW, SUFFIX_SCHEDULER, Lists.newArrayList(new String[]{"Service Error", DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR.name()}));
    }

    @Test(timeout = 150000)
    public void testNonFatalErrors() throws IOException, TezException, InterruptedException {
        TezClient build = TezClient.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + "testNonFatalErrors_session", new TezConfiguration(extServiceTestHelper.getConfForJobs())).setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
        try {
            build.start();
            LOG.info("TezSessionStarted for testNonFatalErrors");
            build.waitTillReady();
            LOG.info("TezSession ready for submission for testNonFatalErrors");
            runAndVerifyForNonFatalErrors(build, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL);
            runAndVerifyForNonFatalErrors(build, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL);
            runAndVerifyForNonFatalErrors(build, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL);
            build.stop();
        } catch (Throwable th) {
            build.stop();
            throw th;
        }
    }

    @Test(timeout = 90000)
    public void testContainerLauncherReportFatalError() throws Exception {
        testFatalError("_testContainerLauncherReportFatalError_", EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL, SUFFIX_LAUNCHER, Lists.newArrayList(new String[]{ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE, ServicePluginErrorDefaults.INCONSISTENT_STATE.name()}));
    }

    @Test(timeout = 90000)
    public void testTaskCommReportFatalError() throws Exception {
        testFatalError("_testTaskCommReportFatalError_", EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL, SUFFIX_TASKCOMM, Lists.newArrayList(new String[]{ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE, ServicePluginErrorDefaults.INCONSISTENT_STATE.name()}));
    }

    @Test(timeout = 90000)
    public void testTaskSchedulerReportFatalError() throws Exception {
        testFatalError("_testTaskSchedulerReportFatalError_", EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL, SUFFIX_SCHEDULER, Lists.newArrayList(new String[]{ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE, ServicePluginErrorDefaults.INCONSISTENT_STATE.name()}));
    }

    private void testFatalError(String str, Vertex.VertexExecutionContext vertexExecutionContext, String str2, List<String> list) throws IOException, TezException, YarnException, InterruptedException {
        TezConfiguration tezConfiguration = new TezConfiguration(extServiceTestHelper.getConfForJobs());
        TezClient build = TezClient.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + str + "_session", tezConfiguration).setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
        ApplicationId applicationId = null;
        try {
            try {
                build.start();
                LOG.info("TezSessionStarted for " + str);
                build.waitTillReady();
                LOG.info("TezSession ready for submission for " + str);
                DAGStatus waitForCompletionWithStatusUpdates = build.submitDAG(new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, vertexExecutionContext, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, str2).createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH, 3)).waitForCompletionWithStatusUpdates(Sets.newHashSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
                Assert.assertEquals(DAGStatus.State.ERROR, waitForCompletionWithStatusUpdates.getState());
                boolean z = false;
                Iterator it = waitForCompletionWithStatusUpdates.getDiagnostics().iterator();
                while (it.hasNext()) {
                    z = checkDiag((String) it.next(), list);
                    if (z) {
                        break;
                    }
                }
                applicationId = build.getAppMasterApplicationId();
                Assert.assertTrue(z);
                build.stop();
            } catch (InterruptedException e) {
                e.printStackTrace();
                build.stop();
            }
            if (applicationId != null) {
                YarnClient createYarnClient = YarnClient.createYarnClient();
                try {
                    createYarnClient.init(tezConfiguration);
                    createYarnClient.start();
                    ApplicationReport applicationReport = createYarnClient.getApplicationReport(applicationId);
                    YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
                    while (!EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, YarnApplicationState.KILLED).contains(yarnApplicationState)) {
                        Thread.sleep(200L);
                        applicationReport = createYarnClient.getApplicationReport(applicationId);
                        yarnApplicationState = applicationReport.getYarnApplicationState();
                    }
                    String diagnostics = createYarnClient.getApplicationAttemptReport(applicationReport.getCurrentApplicationAttemptId()).getDiagnostics();
                    Assert.assertEquals(FinalApplicationStatus.FAILED, applicationReport.getFinalApplicationStatus());
                    Assert.assertEquals(YarnApplicationState.FINISHED, applicationReport.getYarnApplicationState());
                    checkDiag(diagnostics, list);
                    createYarnClient.stop();
                } catch (Throwable th) {
                    createYarnClient.stop();
                    throw th;
                }
            }
        } catch (Throwable th2) {
            build.stop();
            throw th2;
        }
    }

    private boolean checkDiag(String str, List<String> list) {
        boolean z = true;
        Iterator<String> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!str.contains(it.next())) {
                z = false;
                break;
            }
            z = true;
        }
        return z;
    }

    private void runAndVerifyForNonFatalErrors(TezClient tezClient, String str, Vertex.VertexExecutionContext vertexExecutionContext) throws TezException, InterruptedException, IOException {
        LOG.info("Running JoinValidate with componentName reportNonFatalException");
        DAGStatus waitForCompletionWithStatusUpdates = tezClient.submitDAG(new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, vertexExecutionContext, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, str).createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH, 3)).waitForCompletionWithStatusUpdates(Sets.newHashSet(new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        Assert.assertEquals(DAGStatus.State.FAILED, waitForCompletionWithStatusUpdates.getState());
        boolean z = false;
        Iterator it = waitForCompletionWithStatusUpdates.getDiagnostics().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String str2 = (String) it.next();
            if (str2.contains(ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE) && str2.contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())) {
                z = true;
                break;
            }
        }
        Assert.assertTrue(z);
    }
}
