package org.apache.tez.test;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/test/TestTaskErrorsUsingLocalMode.class */
public class TestTaskErrorsUsingLocalMode {
    private static final Logger LOG = LoggerFactory.getLogger(TestTaskErrorsUsingLocalMode.class);
    private static final String VERTEX_NAME = "vertex1";

    /* loaded from: input_file:org/apache/tez/test/TestTaskErrorsUsingLocalMode$FailingProcessor.class */
    public static class FailingProcessor extends AbstractLogicalIOProcessor {
        private static final String FAIL_STRING_NON_FATAL = "non-fatal-fail";
        private static final String FAIL_STRING_FATAL = "fatal-fail";
        private static final String KILL_STRING = "kill-self";
        private static volatile boolean shouldFail;
        private static volatile boolean fatalError;
        private static volatile boolean shouldKill;
        private static volatile int killModeAttemptNumberToSucceed;

        static void reset() {
            shouldFail = false;
            fatalError = false;
            shouldKill = false;
            killModeAttemptNumberToSucceed = -1;
        }

        static void configureForNonFatalFail() {
            reset();
            shouldFail = true;
        }

        static void configureForFatalFail() {
            reset();
            shouldFail = true;
            fatalError = true;
        }

        static void configureForKilled(int i) {
            reset();
            shouldKill = true;
            killModeAttemptNumberToSucceed = i;
        }

        public FailingProcessor(ProcessorContext processorContext) {
            super(processorContext);
        }

        public void initialize() throws Exception {
        }

        public void handleEvents(List<Event> list) {
        }

        public void close() throws Exception {
        }

        public void run(Map<String, LogicalInput> map, Map<String, LogicalOutput> map2) throws Exception {
            TestTaskErrorsUsingLocalMode.LOG.info("Running Failing processor");
            if (!shouldFail) {
                if (!shouldKill || getContext().getTaskAttemptNumber() == killModeAttemptNumberToSucceed) {
                    return;
                }
                TestTaskErrorsUsingLocalMode.LOG.info("Reporting self-kill for attempt=" + getContext().getTaskAttemptNumber());
                getContext().killSelf((Throwable) null, KILL_STRING);
                return;
            }
            if (fatalError) {
                TestTaskErrorsUsingLocalMode.LOG.info("Reporting fatal error");
                getContext().reportFailure(TaskFailureType.FATAL, (Throwable) null, FAIL_STRING_FATAL);
            } else {
                TestTaskErrorsUsingLocalMode.LOG.info("Reporting non-fatal error");
                getContext().reportFailure(TaskFailureType.NON_FATAL, (Throwable) null, FAIL_STRING_NON_FATAL);
            }
        }

        static {
            reset();
        }
    }

    @Test(timeout = 20000)
    public void testFatalErrorReported() throws IOException, TezException, InterruptedException {
        TezClient tezClient = getTezClient("testFatalErrorReported");
        DAGClient dAGClient = null;
        try {
            FailingProcessor.configureForFatalFail();
            dAGClient = tezClient.submitDAG(DAG.create("testFatalErrorReportedDag").addVertex(Vertex.create(VERTEX_NAME, ProcessorDescriptor.create(FailingProcessor.class.getName()), 1)));
            dAGClient.waitForCompletion();
            Assert.assertEquals(DAGStatus.State.FAILED, dAGClient.getDAGStatus((Set) null).getState());
            Assert.assertEquals(1L, dAGClient.getVertexStatus(VERTEX_NAME, (Set) null).getProgress().getFailedTaskAttemptCount());
            if (dAGClient != null) {
                dAGClient.close();
            }
            tezClient.stop();
        } catch (Throwable th) {
            if (dAGClient != null) {
                dAGClient.close();
            }
            tezClient.stop();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testNonFatalErrorReported() throws IOException, TezException, InterruptedException {
        TezClient tezClient = getTezClient("testNonFatalErrorReported");
        DAGClient dAGClient = null;
        try {
            FailingProcessor.configureForNonFatalFail();
            dAGClient = tezClient.submitDAG(DAG.create("testNonFatalErrorReported").addVertex(Vertex.create(VERTEX_NAME, ProcessorDescriptor.create(FailingProcessor.class.getName()), 1)));
            dAGClient.waitForCompletion();
            Assert.assertEquals(DAGStatus.State.FAILED, dAGClient.getDAGStatus((Set) null).getState());
            Assert.assertEquals(4L, dAGClient.getVertexStatus(VERTEX_NAME, (Set) null).getProgress().getFailedTaskAttemptCount());
            if (dAGClient != null) {
                dAGClient.close();
            }
            tezClient.stop();
        } catch (Throwable th) {
            if (dAGClient != null) {
                dAGClient.close();
            }
            tezClient.stop();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSelfKillReported() throws IOException, TezException, InterruptedException {
        TezClient tezClient = getTezClient("testSelfKillReported");
        DAGClient dAGClient = null;
        try {
            FailingProcessor.configureForKilled(10);
            dAGClient = tezClient.submitDAG(DAG.create("testSelfKillReported").addVertex(Vertex.create(VERTEX_NAME, ProcessorDescriptor.create(FailingProcessor.class.getName()), 1)));
            dAGClient.waitForCompletion();
            Assert.assertEquals(DAGStatus.State.SUCCEEDED, dAGClient.getDAGStatus((Set) null).getState());
            Assert.assertEquals(10L, dAGClient.getVertexStatus(VERTEX_NAME, (Set) null).getProgress().getKilledTaskAttemptCount());
            if (dAGClient != null) {
                dAGClient.close();
            }
            tezClient.stop();
        } catch (Throwable th) {
            if (dAGClient != null) {
                dAGClient.close();
            }
            tezClient.stop();
            throw th;
        }
    }

    private TezClient getTezClient(String str) throws IOException, TezException {
        TezConfiguration tezConfiguration = new TezConfiguration();
        tezConfiguration.setBoolean("tez.local.mode", true);
        tezConfiguration.set("fs.defaultFS", "file:///");
        tezConfiguration.setBoolean("tez.runtime.optimize.local.fetch", true);
        tezConfiguration.setLong("tez.am.sleep.time.before.exit.millis", 500L);
        TezClient create = TezClient.create(str, tezConfiguration, true);
        create.start();
        return create;
    }
}
