/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.tests;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.app.ErrorPluginConfiguration;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerServiceWithErrors;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.apache.tez.tests.ExternalTezServiceTestHelper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestExternalTezServicesErrors {
    private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class);
    private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
    private static final String EXT_THROW_ERROR_ENTITY_NAME = "ExtServiceTestThrowErrors";
    private static final String EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportNonFatalErrors";
    private static final String EXT_REPORT_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportFatalErrors";
    private static final String SUFFIX_LAUNCHER = "ContainerLauncher";
    private static final String SUFFIX_TASKCOMM = "TaskCommunicator";
    private static final String SUFFIX_SCHEDULER = "TaskScheduler";
    private static ExternalTezServiceTestHelper extServiceTestHelper;
    private static ServicePluginsDescriptor servicePluginsDescriptor;
    private static final Path SRC_DATA_DIR;
    private static final Path HASH_JOIN_EXPECTED_RESULT_PATH;
    private static final Path HASH_JOIN_OUTPUT_PATH;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_THROW;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_THROW;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_THROW;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL;
    private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT;
    private static String TEST_ROOT_DIR;

    @BeforeClass
    public static void setup() throws Exception {
        extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
        UserPayload userPayload = TezUtils.createUserPayloadFromConf((Configuration)extServiceTestHelper.getConfForJobs());
        UserPayload userPayloadThrowError = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createThrowErrorConf());
        UserPayload userPayloadReportFatalErrorLauncher = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_LAUNCHER));
        UserPayload userPayloadReportFatalErrorTaskComm = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_TASKCOMM));
        UserPayload userPayloadReportFatalErrorScheduler = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_SCHEDULER));
        UserPayload userPayloadReportNonFatalErrorLauncher = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_LAUNCHER));
        UserPayload userPayloadReportNonFatalErrorTaskComm = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_TASKCOMM));
        UserPayload userPayloadReportNonFatalErrorScheduler = ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_SCHEDULER));
        TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{(TaskSchedulerDescriptor)TaskSchedulerDescriptor.create((String)EXT_PUSH_ENTITY_NAME, (String)TezTestServiceTaskSchedulerService.class.getName()).setUserPayload(userPayload), (TaskSchedulerDescriptor)TaskSchedulerDescriptor.create((String)EXT_THROW_ERROR_ENTITY_NAME, (String)TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(userPayloadThrowError), (TaskSchedulerDescriptor)TaskSchedulerDescriptor.create((String)EXT_REPORT_FATAL_ERROR_ENTITY_NAME, (String)TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorScheduler), (TaskSchedulerDescriptor)TaskSchedulerDescriptor.create((String)EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, (String)TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorScheduler)};
        ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{(ContainerLauncherDescriptor)ContainerLauncherDescriptor.create((String)EXT_PUSH_ENTITY_NAME, (String)TezTestServiceNoOpContainerLauncher.class.getName()).setUserPayload(userPayload), (ContainerLauncherDescriptor)ContainerLauncherDescriptor.create((String)EXT_THROW_ERROR_ENTITY_NAME, (String)TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadThrowError), (ContainerLauncherDescriptor)ContainerLauncherDescriptor.create((String)EXT_REPORT_FATAL_ERROR_ENTITY_NAME, (String)TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorLauncher), (ContainerLauncherDescriptor)ContainerLauncherDescriptor.create((String)EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, (String)TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorLauncher)};
        TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{(TaskCommunicatorDescriptor)TaskCommunicatorDescriptor.create((String)EXT_PUSH_ENTITY_NAME, (String)TezTestServiceTaskCommunicatorImpl.class.getName()).setUserPayload(userPayload), (TaskCommunicatorDescriptor)TaskCommunicatorDescriptor.create((String)EXT_THROW_ERROR_ENTITY_NAME, (String)TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadThrowError), (TaskCommunicatorDescriptor)TaskCommunicatorDescriptor.create((String)EXT_REPORT_FATAL_ERROR_ENTITY_NAME, (String)TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorTaskComm), (TaskCommunicatorDescriptor)TaskCommunicatorDescriptor.create((String)EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, (String)TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorTaskComm)};
        servicePluginsDescriptor = ServicePluginsDescriptor.create((boolean)true, (boolean)true, (TaskSchedulerDescriptor[])taskSchedulerDescriptors, (ContainerLauncherDescriptor[])containerLauncherDescriptors, (TaskCommunicatorDescriptor[])taskCommunicatorDescriptors);
        extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor);
        Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
        Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
        extServiceTestHelper.setupHashJoinData(SRC_DATA_DIR, dataPath1, dataPath2, HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH);
        extServiceTestHelper.shutdownSharedTezClient();
    }

    @AfterClass
    public static void tearDown() throws IOException, TezException {
        extServiceTestHelper.tearDownAll();
    }

    @Test(timeout=90000L)
    public void testContainerLauncherThrowError() throws Exception {
        this.testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_THROW, SUFFIX_LAUNCHER, Lists.newArrayList((Object[])new String[]{"Service Error", DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR.name()}));
    }

    @Test(timeout=90000L)
    public void testTaskCommunicatorThrowError() throws Exception {
        this.testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_TASKCOMM_THROW, SUFFIX_TASKCOMM, Lists.newArrayList((Object[])new String[]{"Service Error", DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR.name()}));
    }

    @Test(timeout=90000L)
    public void testTaskSchedulerThrowError() throws Exception {
        this.testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_SCHEDULER_THROW, SUFFIX_SCHEDULER, Lists.newArrayList((Object[])new String[]{"Service Error", DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR.name()}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=150000L)
    public void testNonFatalErrors() throws IOException, TezException, InterruptedException {
        String methodName = "testNonFatalErrors";
        TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
        TezClient tezClient = TezClient.newBuilder((String)(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session"), (TezConfiguration)tezClientConf).setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
        try {
            tezClient.start();
            LOG.info("TezSessionStarted for " + methodName);
            tezClient.waitTillReady();
            LOG.info("TezSession ready for submission for " + methodName);
            this.runAndVerifyForNonFatalErrors(tezClient, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL);
            this.runAndVerifyForNonFatalErrors(tezClient, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL);
            this.runAndVerifyForNonFatalErrors(tezClient, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL);
        }
        finally {
            tezClient.stop();
        }
    }

    @Test(timeout=90000L)
    public void testContainerLauncherReportFatalError() throws Exception {
        this.testFatalError("_testContainerLauncherReportFatalError_", EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL, SUFFIX_LAUNCHER, Lists.newArrayList((Object[])new String[]{"ReportedFatalError", ServicePluginErrorDefaults.INCONSISTENT_STATE.name()}));
    }

    @Test(timeout=90000L)
    public void testTaskCommReportFatalError() throws Exception {
        this.testFatalError("_testTaskCommReportFatalError_", EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL, SUFFIX_TASKCOMM, Lists.newArrayList((Object[])new String[]{"ReportedFatalError", ServicePluginErrorDefaults.INCONSISTENT_STATE.name()}));
    }

    @Test(timeout=90000L)
    public void testTaskSchedulerReportFatalError() throws Exception {
        this.testFatalError("_testTaskSchedulerReportFatalError_", EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL, SUFFIX_SCHEDULER, Lists.newArrayList((Object[])new String[]{"ReportedFatalError", ServicePluginErrorDefaults.INCONSISTENT_STATE.name()}));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testFatalError(String methodName, Vertex.VertexExecutionContext lhsExecutionContext, String dagNameSuffix, List<String> expectedDiagMessages) throws IOException, TezException, YarnException, InterruptedException {
        TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
        TezClient tezClient = TezClient.newBuilder((String)(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session"), (TezConfiguration)tezClientConf).setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
        ApplicationId appId = null;
        try {
            String diag;
            tezClient.start();
            LOG.info("TezSessionStarted for " + methodName);
            tezClient.waitTillReady();
            LOG.info("TezSession ready for submission for " + methodName);
            JoinValidateConfigured joinValidate = new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsExecutionContext, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, dagNameSuffix);
            DAG dag = joinValidate.createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH, 3);
            DAGClient dagClient = tezClient.submitDAG(dag);
            DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates((Set)Sets.newHashSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
            Assert.assertEquals((Object)DAGStatus.State.ERROR, (Object)dagStatus.getState());
            boolean foundDiag = false;
            Iterator iterator = dagStatus.getDiagnostics().iterator();
            while (iterator.hasNext() && !(foundDiag = this.checkDiag(diag = (String)iterator.next(), expectedDiagMessages))) {
            }
            appId = tezClient.getAppMasterApplicationId();
            Assert.assertTrue((boolean)foundDiag);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        finally {
            tezClient.stop();
        }
        if (appId != null) {
            YarnClient yarnClient = YarnClient.createYarnClient();
            try {
                yarnClient.init((Configuration)tezClientConf);
                yarnClient.start();
                ApplicationReport appReport = yarnClient.getApplicationReport(appId);
                YarnApplicationState appState = appReport.getYarnApplicationState();
                while (!EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED, YarnApplicationState.KILLED).contains(appState)) {
                    Thread.sleep(200L);
                    appReport = yarnClient.getApplicationReport(appId);
                    appState = appReport.getYarnApplicationState();
                }
                ApplicationAttemptId appAttemptId = appReport.getCurrentApplicationAttemptId();
                ApplicationAttemptReport appAttemptReport = yarnClient.getApplicationAttemptReport(appAttemptId);
                String diag = appAttemptReport.getDiagnostics();
                Assert.assertEquals((Object)FinalApplicationStatus.FAILED, (Object)appReport.getFinalApplicationStatus());
                Assert.assertEquals((Object)YarnApplicationState.FINISHED, (Object)appReport.getYarnApplicationState());
                this.checkDiag(diag, expectedDiagMessages);
            }
            finally {
                yarnClient.stop();
            }
        }
    }

    private boolean checkDiag(String diag, List<String> expected) {
        boolean found = true;
        for (String exp : expected) {
            if (diag.contains(exp)) {
                found = true;
                continue;
            }
            found = false;
            break;
        }
        return found;
    }

    private void runAndVerifyForNonFatalErrors(TezClient tezClient, String componentName, Vertex.VertexExecutionContext lhsContext) throws TezException, InterruptedException, IOException {
        LOG.info("Running JoinValidate with componentName reportNonFatalException");
        JoinValidateConfigured joinValidate = new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, EXECUTION_CONTEXT_EXT_SERVICE_PUSH, componentName);
        DAG dag = joinValidate.createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()), HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH, 3);
        DAGClient dagClient = tezClient.submitDAG(dag);
        DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates((Set)Sets.newHashSet((Object[])new StatusGetOpts[]{StatusGetOpts.GET_COUNTERS}));
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagStatus.getState());
        boolean foundDiag = false;
        for (String diag : dagStatus.getDiagnostics()) {
            if (!diag.contains("ReportedError") || !diag.contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())) continue;
            foundDiag = true;
            break;
        }
        Assert.assertTrue((boolean)foundDiag);
    }

    static {
        SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServicesErrors.class.getSimpleName());
        HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
        HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
        EXECUTION_CONTEXT_EXT_SERVICE_PUSH = Vertex.VertexExecutionContext.create((String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME);
        EXECUTION_CONTEXT_LAUNCHER_THROW = Vertex.VertexExecutionContext.create((String)EXT_PUSH_ENTITY_NAME, (String)EXT_THROW_ERROR_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME);
        EXECUTION_CONTEXT_TASKCOMM_THROW = Vertex.VertexExecutionContext.create((String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME, (String)EXT_THROW_ERROR_ENTITY_NAME);
        EXECUTION_CONTEXT_SCHEDULER_THROW = Vertex.VertexExecutionContext.create((String)EXT_THROW_ERROR_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME);
        EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL = Vertex.VertexExecutionContext.create((String)EXT_PUSH_ENTITY_NAME, (String)EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME);
        EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL = Vertex.VertexExecutionContext.create((String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME, (String)EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME);
        EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL = Vertex.VertexExecutionContext.create((String)EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME);
        EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL = Vertex.VertexExecutionContext.create((String)EXT_PUSH_ENTITY_NAME, (String)EXT_REPORT_FATAL_ERROR_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME);
        EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL = Vertex.VertexExecutionContext.create((String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME, (String)EXT_REPORT_FATAL_ERROR_ENTITY_NAME);
        EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL = Vertex.VertexExecutionContext.create((String)EXT_REPORT_FATAL_ERROR_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME, (String)EXT_PUSH_ENTITY_NAME);
        EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
        TEST_ROOT_DIR = "target/" + TestExternalTezServicesErrors.class.getName() + "-tmpDir";
    }
}

