package org.apache.tez.dag.app.dag.impl;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.MockClock;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.SpeculatorEventTaskAttemptStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TezBuilderUtils;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ServicePluginException;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestTaskAttempt.class */
public class TestTaskAttempt {
    private static final Logger LOG = LoggerFactory.getLogger(TestTaskAttempt.class);
    AppContext appCtx;
    TaskLocationHint locationHint;
    Vertex mockVertex;
    TezConfiguration vertexConf = new TezConfiguration();
    ServicePluginInfo servicePluginInfo = new ServicePluginInfo().setContainerLauncherName(TezConstants.getTezYarnServicePluginName());

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestTaskAttempt$MockEventHandler.class */
    public static class MockEventHandler implements EventHandler {
        public boolean internalError;

        public void handle(Event event) {
            if ((event instanceof DAGEvent) && DAGEventType.INTERNAL_ERROR == ((DAGEvent) event).getType()) {
                this.internalError = true;
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestTaskAttempt$MockTaskAttemptImpl.class */
    private class MockTaskAttemptImpl extends TaskAttemptImpl {
        public int taskAttemptStartedEventLogged;
        public int taskAttemptFinishedEventLogged;
        boolean inputFailedReported;

        public MockTaskAttemptImpl(TezTaskID tezTaskID, int i, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Configuration configuration, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, boolean z, Resource resource, ContainerContext containerContext, boolean z2) {
            super(TezBuilderUtils.newTaskAttemptId(tezTaskID, i), eventHandler, taskCommunicatorManagerInterface, configuration, clock, taskHeartbeatHandler, appContext, z, resource, containerContext, z2, TestTaskAttempt.this.mockVertex, TestTaskAttempt.this.locationHint, (TaskSpec) null, (TezTaskAttemptID) null, (Set) null);
            this.taskAttemptStartedEventLogged = 0;
            this.taskAttemptFinishedEventLogged = 0;
            this.inputFailedReported = false;
        }

        protected Vertex getVertex() {
            return TestTaskAttempt.this.mockVertex;
        }

        protected void logJobHistoryAttemptStarted() {
            this.taskAttemptStartedEventLogged++;
            super.logJobHistoryAttemptStarted();
        }

        protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal taskAttemptStateInternal) {
            this.taskAttemptFinishedEventLogged++;
            super.logJobHistoryAttemptFinishedEvent(taskAttemptStateInternal);
        }

        protected void logJobHistoryAttemptUnsuccesfulCompletion(TaskAttemptState taskAttemptState, TaskFailureType taskFailureType) {
            this.taskAttemptFinishedEventLogged++;
            super.logJobHistoryAttemptUnsuccesfulCompletion(taskAttemptState, taskFailureType);
        }

        protected void sendInputFailedToConsumers() {
            this.inputFailedReported = true;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestTaskAttempt$StubbedFS.class */
    public static class StubbedFS extends RawLocalFileSystem {
        public FileStatus getFileStatus(Path path) throws IOException {
            return new FileStatus(1L, false, 1, 1L, 1L, path);
        }
    }

    @BeforeClass
    public static void setup() {
        MockDNSToSwitchMapping.initializeMockRackResolver();
    }

    @Before
    public void setupTest() {
        this.appCtx = (AppContext) Mockito.mock(AppContext.class);
        Mockito.when(this.appCtx.getAMConf()).thenReturn(new Configuration());
        Mockito.when(this.appCtx.getContainerLauncherName(Matchers.anyInt())).thenReturn(TezConstants.getTezYarnServicePluginName());
        createMockVertex(this.vertexConf);
        ((AppContext) Mockito.doReturn((HistoryEventHandler) Mockito.mock(HistoryEventHandler.class)).when(this.appCtx)).getHistoryHandler();
        LogManager.getRootLogger().setLevel(Level.DEBUG);
    }

    private void createMockVertex(Configuration configuration) {
        this.mockVertex = (Vertex) Mockito.mock(Vertex.class);
        Mockito.when(this.mockVertex.getServicePluginInfo()).thenReturn(this.servicePluginInfo);
        Mockito.when(this.mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(configuration));
    }

    @Test(timeout = 5000)
    public void testLocalityRequest() {
        TaskAttemptImpl.ScheduleTaskattemptTransition scheduleTaskattemptTransition = new TaskAttemptImpl.ScheduleTaskattemptTransition();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        TreeSet treeSet = new TreeSet();
        treeSet.add("host1");
        treeSet.add("host2");
        treeSet.add("host3");
        this.locationHint = TaskLocationHint.createTaskLocationHint(treeSet, (Set) null);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1), 1, eventHandler, (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl, (TaskAttemptEventSchedule) Mockito.mock(TaskAttemptEventSchedule.class));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(1))).handle((Event) forClass.capture());
        if (!(forClass.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) {
            Assert.fail("Second event not of type " + AMSchedulerEventTALaunchRequest.class.getName());
        }
        Assert.assertEquals(1L, ((TaskAttemptImpl) mockTaskAttemptImpl).taskRacks.size());
        Assert.assertEquals(3L, ((TaskAttemptImpl) mockTaskAttemptImpl).taskHosts.size());
        for (int i = 0; i < 3; i++) {
            String str = "host" + (i + 1);
            Assert.assertEquals(str, true, Boolean.valueOf(((TaskAttemptImpl) mockTaskAttemptImpl).taskHosts.contains(str)));
        }
    }

    @Test(timeout = 5000)
    public void testRetriesAtSamePriorityConfig() {
        TezConfiguration tezConfiguration = new TezConfiguration();
        tezConfiguration.setBoolean("tez.am.task.reschedule.higher.priority", false);
        tezConfiguration.setBoolean("tez.am.task.reschedule.relaxed.locality", true);
        Mockito.when(this.mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(tezConfiguration));
        TreeSet treeSet = new TreeSet();
        treeSet.add("host1");
        this.locationHint = TaskLocationHint.createTaskLocationHint(treeSet, (Set) null);
        TaskAttemptImpl.ScheduleTaskattemptTransition scheduleTaskattemptTransition = new TaskAttemptImpl.ScheduleTaskattemptTransition();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, eventHandler, (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
        MockTaskAttemptImpl mockTaskAttemptImpl2 = new MockTaskAttemptImpl(tezTaskID, 1, eventHandler, (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        TaskAttemptEventSchedule taskAttemptEventSchedule = (TaskAttemptEventSchedule) Mockito.mock(TaskAttemptEventSchedule.class);
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityLowLimit())).thenReturn(3);
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityHighLimit())).thenReturn(1);
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(1))).handle((Event) forClass.capture());
        AMSchedulerEventTALaunchRequest aMSchedulerEventTALaunchRequest = (AMSchedulerEventTALaunchRequest) forClass.getValue();
        Assert.assertEquals(2L, aMSchedulerEventTALaunchRequest.getPriority());
        Assert.assertEquals(1L, aMSchedulerEventTALaunchRequest.getLocationHint().getHosts().size());
        Assert.assertTrue(aMSchedulerEventTALaunchRequest.getLocationHint().getHosts().contains("host1"));
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl2, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(2))).handle((Event) forClass.capture());
        AMSchedulerEventTALaunchRequest aMSchedulerEventTALaunchRequest2 = (AMSchedulerEventTALaunchRequest) forClass.getValue();
        Assert.assertEquals(2L, aMSchedulerEventTALaunchRequest2.getPriority());
        Assert.assertNull(aMSchedulerEventTALaunchRequest2.getLocationHint());
    }

    @Test(timeout = 5000)
    public void testPriority() {
        TaskAttemptImpl.ScheduleTaskattemptTransition scheduleTaskattemptTransition = new TaskAttemptImpl.ScheduleTaskattemptTransition();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, eventHandler, (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
        MockTaskAttemptImpl mockTaskAttemptImpl2 = new MockTaskAttemptImpl(tezTaskID, 1, eventHandler, (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        TaskAttemptEventSchedule taskAttemptEventSchedule = (TaskAttemptEventSchedule) Mockito.mock(TaskAttemptEventSchedule.class);
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityLowLimit())).thenReturn(3);
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityHighLimit())).thenReturn(1);
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(1))).handle((Event) forClass.capture());
        Assert.assertEquals(2L, ((AMSchedulerEventTALaunchRequest) forClass.getValue()).getPriority());
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl2, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(2))).handle((Event) forClass.capture());
        Assert.assertEquals(1L, ((AMSchedulerEventTALaunchRequest) forClass.getValue()).getPriority());
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityLowLimit())).thenReturn(6);
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityHighLimit())).thenReturn(4);
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(3))).handle((Event) forClass.capture());
        Assert.assertEquals(5L, ((AMSchedulerEventTALaunchRequest) forClass.getValue()).getPriority());
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl2, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(4))).handle((Event) forClass.capture());
        Assert.assertEquals(4L, ((AMSchedulerEventTALaunchRequest) forClass.getValue()).getPriority());
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityLowLimit())).thenReturn(5);
        Mockito.when(Integer.valueOf(taskAttemptEventSchedule.getPriorityHighLimit())).thenReturn(5);
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(5))).handle((Event) forClass.capture());
        Assert.assertEquals(5L, ((AMSchedulerEventTALaunchRequest) forClass.getValue()).getPriority());
        scheduleTaskattemptTransition.transition(mockTaskAttemptImpl2, taskAttemptEventSchedule);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(6))).handle((Event) forClass.capture());
        Assert.assertEquals(5L, ((AMSchedulerEventTALaunchRequest) forClass.getValue()).getPriority());
    }

    @Test(timeout = 5000)
    public void testHostResolveAttempt() throws Exception {
        TaskAttemptImpl.ScheduleTaskattemptTransition scheduleTaskattemptTransition = new TaskAttemptImpl.ScheduleTaskattemptTransition();
        EventHandler eventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        String[] strArr = {"127.0.0.1", "host2", "host3"};
        TreeSet treeSet = new TreeSet(Arrays.asList("host1", "host2", "host3"));
        this.locationHint = TaskLocationHint.createTaskLocationHint(new TreeSet(Arrays.asList(strArr)), (Set) null);
        TaskAttemptImpl taskAttemptImpl = (TaskAttemptImpl) Mockito.spy(new MockTaskAttemptImpl(TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1), 1, eventHandler, (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false));
        Mockito.when(taskAttemptImpl.resolveHosts(strArr)).thenReturn((String[]) treeSet.toArray(new String[3]));
        scheduleTaskattemptTransition.transition(taskAttemptImpl, (TaskAttemptEventSchedule) Mockito.mock(TaskAttemptEventSchedule.class));
        ((TaskAttemptImpl) Mockito.verify(taskAttemptImpl)).resolveHosts(strArr);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        ((EventHandler) Mockito.verify(eventHandler, Mockito.times(1))).handle((Event) forClass.capture());
        if (!(forClass.getAllValues().get(0) instanceof AMSchedulerEventTALaunchRequest)) {
            Assert.fail("Second Event not of type ContainerRequestEvent");
        }
        HashMap hashMap = new HashMap();
        hashMap.put("host1", true);
        hashMap.put("host2", true);
        hashMap.put("host3", true);
        Iterator it = taskAttemptImpl.taskHosts.iterator();
        while (it.hasNext()) {
            hashMap.remove((String) it.next());
        }
        Assert.assertEquals(0L, hashMap.size());
    }

    @Test(timeout = 5000)
    public void testLaunchFailedWhileKilling() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
        MockEventHandler mockEventHandler = new MockEventHandler();
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        AppContext appContext = this.appCtx;
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(appContext)).getClusterInfo();
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), appContext, false, newInstance3, createFakeContainerContext(), false);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(tezTaskAttemptID, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventKillRequest(tezTaskAttemptID, (String) null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
        Assert.assertEquals(TaskAttemptStateInternal.KILL_IN_PROGRESS, mockTaskAttemptImpl.getInternalState());
        mockTaskAttemptImpl.handle(new TaskAttemptEventTezEventUpdate(mockTaskAttemptImpl.getID(), Collections.EMPTY_LIST));
        Assert.assertFalse("InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in KILL_IN_PROGRESS state", mockEventHandler.internalError);
        mockTaskAttemptImpl.handle(new TaskAttemptEventKillRequest(tezTaskAttemptID, (String) null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
        Assert.assertFalse(mockEventHandler.internalError);
    }

    @Test(timeout = 5000)
    public void testContainerTerminationWhileRunning() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        Assert.assertEquals("Task attempt is not in the STARTING state", mockTaskAttemptImpl.getState(), TaskAttemptState.STARTING);
        Assert.assertEquals("Task attempt internal state is not at SUBMITTED", mockTaskAttemptImpl.getInternalState(), TaskAttemptStateInternal.SUBMITTED);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(5))).handle((Event) forClass.capture());
        mockTaskAttemptImpl.handle(new TaskAttemptEventContainerTerminating(id, "Terminating", TaskAttemptTerminationCause.APPLICATION_ERROR));
        Assert.assertFalse("InternalError occurred trying to handle TA_CONTAINER_TERMINATING", mockEventHandler.internalError);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals("Task attempt is not in the  FAILED state", mockTaskAttemptImpl.getState(), TaskAttemptState.FAILED);
        Assert.assertEquals(1L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals("Terminating", mockTaskAttemptImpl.getDiagnostics().get(0));
        Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, mockTaskAttemptImpl.getTerminationCause());
        int i = 5 + 3;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        Assert.assertEquals(TaskFailureType.NON_FATAL, verifyEventType(forClass2.getAllValues().subList(5, i), TaskEventTAFailed.class, 1).getTaskFailureType());
        verifyEventType(forClass2.getAllValues().subList(5, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), DAGEventCounterUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventContainerTerminated(newInstance5, id, "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i + 0))).handle((Event) ArgumentCaptor.forClass(Event.class).capture());
        Assert.assertEquals(2L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals("Terminated", mockTaskAttemptImpl.getDiagnostics().get(1));
        Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, mockTaskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testContainerTerminatedWhileRunning() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(5))).handle((Event) forClass.capture());
        Assert.assertEquals("Task attempt is not in running state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        mockTaskAttemptImpl.handle(new TaskAttemptEventContainerTerminated(newInstance5, id, "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
        Assert.assertFalse("InternalError occurred trying to handle TA_CONTAINER_TERMINATED", mockEventHandler.internalError);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals("Terminated", mockTaskAttemptImpl.getDiagnostics().get(0));
        Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, mockTaskAttemptImpl.getTerminationCause());
        int i = 5 + 3;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        Assert.assertEquals(TaskFailureType.NON_FATAL, verifyEventType(forClass2.getAllValues().subList(5, i), TaskEventTAFailed.class, 1).getTaskFailureType());
        verifyEventType(forClass2.getAllValues().subList(5, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), DAGEventCounterUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventContainerTerminated(newInstance5, id, "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i + 0))).handle((Event) ArgumentCaptor.forClass(Event.class).capture());
    }

    @Test(timeout = 5000)
    public void testContainerTerminatedAfterSuccess() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(5))).handle((Event) forClass.capture());
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in the  SUCCEEDED state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        int i = 5 + 3;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        verifyEventType(forClass2.getAllValues().subList(5, i), TaskEventTASucceeded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), DAGEventCounterUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventContainerTerminated(newInstance5, id, "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i + 0))).handle((Event) ArgumentCaptor.forClass(Event.class).capture());
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, mockTaskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testLastDataEventRecording() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        configuration.setBoolean("tez.am.speculation.enabled", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        TezTaskAttemptID tezTaskAttemptID = (TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class);
        TezTaskAttemptID tezTaskAttemptID2 = (TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class);
        TezEvent tezEvent = (TezEvent) Mockito.mock(TezEvent.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(Long.valueOf(tezEvent.getEventReceivedTime())).thenReturn(1024L);
        Mockito.when(tezEvent.getSourceInfo().getTaskAttemptID()).thenReturn(tezTaskAttemptID);
        TezEvent tezEvent2 = (TezEvent) Mockito.mock(TezEvent.class, Mockito.RETURNS_DEEP_STUBS);
        Mockito.when(Long.valueOf(tezEvent2.getEventReceivedTime())).thenReturn(2048L);
        Mockito.when(tezEvent2.getSourceInfo().getTaskAttemptID()).thenReturn(tezTaskAttemptID2);
        TaskAttemptEventStatusUpdate taskAttemptEventStatusUpdate = new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false));
        Assert.assertEquals(0L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.size());
        mockTaskAttemptImpl.setLastEventSent(tezEvent);
        Assert.assertEquals(1L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.size());
        Assert.assertEquals(1024L, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(0)).getTimestamp());
        Assert.assertEquals(tezTaskAttemptID, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(0)).getTaskAttemptId());
        mockTaskAttemptImpl.handle(taskAttemptEventStatusUpdate);
        mockTaskAttemptImpl.setLastEventSent(tezEvent2);
        Assert.assertEquals(1L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.size());
        Assert.assertEquals(2048L, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(0)).getTimestamp());
        Assert.assertEquals(tezTaskAttemptID2, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(0)).getTaskAttemptId());
        taskAttemptEventStatusUpdate.setReadErrorReported(true);
        mockTaskAttemptImpl.handle(taskAttemptEventStatusUpdate);
        mockTaskAttemptImpl.setLastEventSent(tezEvent);
        Assert.assertEquals(2L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.size());
        Assert.assertEquals(1024L, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(1)).getTimestamp());
        Assert.assertEquals(tezTaskAttemptID, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(1)).getTaskAttemptId());
        mockTaskAttemptImpl.setLastEventSent(tezEvent2);
        Assert.assertEquals(2L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.size());
        Assert.assertEquals(2048L, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(1)).getTimestamp());
        Assert.assertEquals(tezTaskAttemptID2, ((TaskAttemptImpl.DataEventDependencyInfo) ((TaskAttemptImpl) mockTaskAttemptImpl).lastDataEvents.get(1)).getTaskAttemptId());
    }

    @Test(timeout = 5000)
    public void testFailure() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        configuration.setBoolean("tez.am.speculation.enabled", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(6))).handle((Event) forClass.capture());
        verifyEventType(forClass.getAllValues().subList(0, 6), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        mockTaskAttemptImpl.handle(new TaskAttemptEventAttemptFailed(id, TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "0", TaskAttemptTerminationCause.APPLICATION_ERROR));
        Assert.assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", mockTaskAttemptImpl.getInternalState(), TaskAttemptStateInternal.FAIL_IN_PROGRESS);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(1L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals("0", mockTaskAttemptImpl.getDiagnostics().get(0));
        Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, mockTaskAttemptImpl.getTerminationCause());
        Assert.assertEquals(TaskAttemptStateInternal.FAIL_IN_PROGRESS, mockTaskAttemptImpl.getInternalState());
        mockTaskAttemptImpl.handle(new TaskAttemptEventTezEventUpdate(mockTaskAttemptImpl.getID(), Collections.EMPTY_LIST));
        Assert.assertFalse("InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in FAIL_IN_PROGRESS state", mockEventHandler.internalError);
        mockTaskAttemptImpl.handle(new TaskAttemptEventContainerTerminated(newInstance5, id, "1", TaskAttemptTerminationCause.CONTAINER_EXITED));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        Assert.assertEquals(2L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals("1", mockTaskAttemptImpl.getDiagnostics().get(1));
        Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, mockTaskAttemptImpl.getTerminationCause());
        int i = 6 + 5;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        Assert.assertEquals(TaskFailureType.NON_FATAL, verifyEventType(forClass2.getAllValues().subList(6, i), TaskEventTAFailed.class, 1).getTaskFailureType());
        verifyEventType(forClass2.getAllValues().subList(6, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(6, i), DAGEventCounterUpdate.class, 1);
        verifyEventType(forClass2.getAllValues().subList(6, i), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
    }

    @Test(timeout = 5000)
    public void testFailureFatalError() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        configuration.setBoolean("tez.am.speculation.enabled", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(6))).handle((Event) forClass.capture());
        verifyEventType(forClass.getAllValues().subList(0, 6), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        mockTaskAttemptImpl.handle(new TaskAttemptEventAttemptFailed(id, TaskAttemptEventType.TA_FAILED, TaskFailureType.FATAL, "0", TaskAttemptTerminationCause.APPLICATION_ERROR));
        Assert.assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", mockTaskAttemptImpl.getInternalState(), TaskAttemptStateInternal.FAIL_IN_PROGRESS);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(1L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals("0", mockTaskAttemptImpl.getDiagnostics().get(0));
        Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, mockTaskAttemptImpl.getTerminationCause());
        Assert.assertEquals(TaskAttemptStateInternal.FAIL_IN_PROGRESS, mockTaskAttemptImpl.getInternalState());
        mockTaskAttemptImpl.handle(new TaskAttemptEventTezEventUpdate(mockTaskAttemptImpl.getID(), Collections.EMPTY_LIST));
        Assert.assertFalse("InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in FAIL_IN_PROGRESS state", mockEventHandler.internalError);
        mockTaskAttemptImpl.handle(new TaskAttemptEventContainerTerminated(newInstance5, id, "1", TaskAttemptTerminationCause.CONTAINER_EXITED));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        Assert.assertEquals(2L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals("1", mockTaskAttemptImpl.getDiagnostics().get(1));
        Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, mockTaskAttemptImpl.getTerminationCause());
        int i = 6 + 5;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        Assert.assertEquals(TaskFailureType.FATAL, verifyEventType(forClass2.getAllValues().subList(6, i), TaskEventTAFailed.class, 1).getTaskFailureType());
        verifyEventType(forClass2.getAllValues().subList(6, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(6, i), DAGEventCounterUpdate.class, 1);
        verifyEventType(forClass2.getAllValues().subList(6, i), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
    }

    @Test
    public void testProgressTimeStampUpdate() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        configuration.setLong("tez.task.progress.stuck.interval-ms", 75L);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        Clock clock = (Clock) Mockito.mock(Clock.class);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(50L);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, clock, taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(100L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.atLeast(1))).handle((Event) forClass.capture());
        if (forClass.getValue() instanceof TaskAttemptEventAttemptFailed) {
            TaskAttemptEventAttemptFailed taskAttemptEventAttemptFailed = (TaskAttemptEventAttemptFailed) forClass.getValue();
            Assert.assertEquals(mockTaskAttemptImpl.getID(), taskAttemptEventAttemptFailed.getTaskAttemptID());
            Assert.assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, taskAttemptEventAttemptFailed.getTerminationCause());
            mockTaskAttemptImpl.handle(taskAttemptEventAttemptFailed);
            Assert.fail("Should not fail since the timestamps do not differ by progress interval config");
        } else {
            Assert.assertEquals("Task Attempt's internal state should be RUNNING!", mockTaskAttemptImpl.getInternalState(), TaskAttemptStateInternal.RUNNING);
        }
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(200L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.atLeast(1))).handle((Event) forClass.capture());
        Assert.assertTrue("This should have been an attempt failed event!", forClass.getValue() instanceof TaskAttemptEventAttemptFailed);
    }

    @Test
    public void testStatusUpdateWithNullCounters() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newContainerId = ContainerId.newContainerId(newInstance2, 3L);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newContainerId);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newContainerId));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        TezCounters tezCounters = new TezCounters();
        tezCounters.findCounter("group", "counter").increment(1L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent(tezCounters, 0.1f, (TaskStatistics) null, false)));
        Assert.assertEquals(1L, mockTaskAttemptImpl.getCounters().findCounter("group", "counter").getValue());
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        Assert.assertEquals(1L, mockTaskAttemptImpl.getCounters().findCounter("group", "counter").getValue());
        tezCounters.findCounter("group", "counter").increment(1L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent(tezCounters, 0.1f, (TaskStatistics) null, false)));
        Assert.assertEquals(2L, mockTaskAttemptImpl.getCounters().findCounter("group", "counter").getValue());
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        Assert.assertEquals(2L, mockTaskAttemptImpl.getCounters().findCounter("group", "counter").getValue());
    }

    @Test(timeout = 60000)
    public void testProgressAfterSubmit() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        configuration.setLong("tez.task.progress.stuck.interval-ms", 50L);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockClock mockClock = new MockClock();
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, mockClock, taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockClock.incrementTime(20L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockClock.incrementTime(55L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.atLeast(1))).handle((Event) forClass.capture());
        if (forClass.getValue() instanceof TaskAttemptEvent) {
            mockTaskAttemptImpl.handle((TaskAttemptEvent) forClass.getValue());
        }
        Assert.assertEquals("Task Attempt's internal state should be SUBMITTED!", mockTaskAttemptImpl.getInternalState(), TaskAttemptStateInternal.SUBMITTED);
    }

    @Test(timeout = 5000)
    public void testNoProgressFail() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        configuration.setLong("tez.task.progress.stuck.interval-ms", 75L);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        Clock clock = (Clock) Mockito.mock(Clock.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, clock, taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(100L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, true)));
        Assert.assertEquals(100L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastNotifyProgressTimestamp);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(150L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, true)));
        Assert.assertEquals(150L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastNotifyProgressTimestamp);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(200L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        Assert.assertEquals(150L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastNotifyProgressTimestamp);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(250L);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        Assert.assertEquals(150L, ((TaskAttemptImpl) mockTaskAttemptImpl).lastNotifyProgressTimestamp);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.atLeast(1))).handle((Event) forClass.capture());
        TaskAttemptEventAttemptFailed taskAttemptEventAttemptFailed = (TaskAttemptEventAttemptFailed) forClass.getValue();
        Assert.assertEquals(mockTaskAttemptImpl.getID(), taskAttemptEventAttemptFailed.getTaskAttemptID());
        Assert.assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, taskAttemptEventAttemptFailed.getTerminationCause());
        Assert.assertEquals(TaskFailureType.NON_FATAL, taskAttemptEventAttemptFailed.getTaskFailureType());
        mockTaskAttemptImpl.handle(taskAttemptEventAttemptFailed);
        Assert.assertEquals("Task attempt is not in the  FAIL_IN_PROGRESS state", mockTaskAttemptImpl.getInternalState(), TaskAttemptStateInternal.FAIL_IN_PROGRESS);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(1L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals(TaskAttemptTerminationCause.NO_PROGRESS, mockTaskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testEventSerializingHash() throws Exception {
        TezVertexID tezVertexID = TezVertexID.getInstance(TezDAGID.getInstance(ApplicationId.newInstance(1L, 2), 1), 1);
        TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, 1);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(tezVertexID, 2);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(tezTaskID, 1);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(tezTaskID2, 1);
        TaskAttemptEvent taskAttemptEvent = new TaskAttemptEvent(tezTaskAttemptID, TaskAttemptEventType.TA_FAILED);
        TaskAttemptEvent taskAttemptEvent2 = new TaskAttemptEvent(tezTaskAttemptID, TaskAttemptEventType.TA_KILL_REQUEST);
        TaskAttemptEvent taskAttemptEvent3 = new TaskAttemptEvent(tezTaskAttemptID2, TaskAttemptEventType.TA_KILL_REQUEST);
        TaskAttemptEvent taskAttemptEvent4 = new TaskAttemptEvent(tezTaskAttemptID3, TaskAttemptEventType.TA_KILL_REQUEST);
        TaskEvent taskEvent = new TaskEvent(tezTaskID, TaskEventType.T_ATTEMPT_KILLED);
        TaskEvent taskEvent2 = new TaskEvent(tezTaskID, TaskEventType.T_ATTEMPT_FAILED);
        TaskEvent taskEvent3 = new TaskEvent(tezTaskID2, TaskEventType.T_ATTEMPT_FAILED);
        Assert.assertEquals(taskAttemptEvent.getSerializingHash(), taskAttemptEvent2.getSerializingHash());
        Assert.assertEquals(taskAttemptEvent2.getSerializingHash(), taskAttemptEvent3.getSerializingHash());
        Assert.assertEquals(taskEvent2.getSerializingHash(), taskEvent.getSerializingHash());
        Assert.assertEquals(taskAttemptEvent.getSerializingHash(), taskEvent.getSerializingHash());
        Assert.assertEquals(taskAttemptEvent4.getSerializingHash(), taskEvent3.getSerializingHash());
        Assert.assertFalse(taskEvent2.getSerializingHash() == taskEvent3.getSerializingHash());
    }

    @Test(timeout = 5000)
    public void testCompletedAtSubmitted() throws ServicePluginException {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.STARTING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(4))).handle((Event) forClass.capture());
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in the  SUCCEEDED state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        int i = 4 + 3;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        Assert.assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, verifyEventType(forClass2.getAllValues().subList(4, i), TaskEventTASucceeded.class, 1).getType());
        verifyEventType(forClass2.getAllValues().subList(4, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(4, i), DAGEventCounterUpdate.class, 1);
    }

    @Test(timeout = 5000)
    public void testSuccess() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        configuration.setBoolean("tez.am.speculation.enabled", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(6))).handle((Event) forClass.capture());
        verifyEventType(forClass.getAllValues().subList(0, 6), SpeculatorEventTaskAttemptStatusUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventStatusUpdate(id, new TaskStatusUpdateEvent((TezCounters) null, 0.1f, (TaskStatistics) null, false)));
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in the  SUCCEEDED state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        int i = 6 + 5;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        Assert.assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, verifyEventType(forClass2.getAllValues().subList(6, i), TaskEventTASucceeded.class, 1).getType());
        verifyEventType(forClass2.getAllValues().subList(6, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(6, i), DAGEventCounterUpdate.class, 1);
        verifyEventType(forClass2.getAllValues().subList(6, i), SpeculatorEventTaskAttemptStatusUpdate.class, 2);
    }

    @Test(timeout = 5000)
    public void testContainerPreemptedAfterSuccess() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", mockTaskAttemptImpl.getState(), TaskAttemptState.RUNNING);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(5))).handle((Event) forClass.capture());
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in the  SUCCEEDED state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        int i = 5 + 3;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        verifyEventType(forClass2.getAllValues().subList(5, i), TaskEventTASucceeded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), DAGEventCounterUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i + 0))).handle((Event) ArgumentCaptor.forClass(Event.class).capture());
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        Assert.assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, mockTaskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testNodeFailedNonLeafVertex() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, mockTaskAttemptImpl.getState());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(5))).handle((Event) forClass.capture());
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in the  SUCCEEDED state", TaskAttemptState.SUCCEEDED, mockTaskAttemptImpl.getState());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        int i = 5 + 3;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        verifyEventType(forClass2.getAllValues().subList(5, i), TaskEventTASucceeded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), DAGEventCounterUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventNodeFailed(id, "NodeDecomissioned", TaskAttemptTerminationCause.NODE_FAILED));
        Assert.assertEquals("Task attempt is not in the  KILLED state", TaskAttemptState.KILLED, mockTaskAttemptImpl.getState());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        Assert.assertEquals(true, Boolean.valueOf(mockTaskAttemptImpl.inputFailedReported));
        int i2 = i + 2;
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i2))).handle((Event) forClass3.capture());
        verifyEventType(forClass3.getAllValues().subList(i, i2), TaskEventTAKilled.class, 1);
        Assert.assertEquals("Task attempt is not in the  KILLED state", TaskAttemptState.KILLED, mockTaskAttemptImpl.getState());
        Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED, mockTaskAttemptImpl.getTerminationCause());
        Assert.assertEquals(TaskAttemptStateInternal.KILLED, mockTaskAttemptImpl.getInternalState());
        mockTaskAttemptImpl.handle(new TaskAttemptEventTezEventUpdate(mockTaskAttemptImpl.getID(), Collections.EMPTY_LIST));
        Assert.assertFalse("InternalError occurred trying to handle TA_TEZ_EVENT_UPDATE in KILLED state", mockEventHandler.internalError);
    }

    @Test(timeout = 5000)
    public void testNodeFailedLeafVertex() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), true);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        Assert.assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING, mockTaskAttemptImpl.getState());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(5))).handle((Event) forClass.capture());
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in the  SUCCEEDED state", TaskAttemptState.SUCCEEDED, mockTaskAttemptImpl.getState());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        Assert.assertEquals(0L, mockTaskAttemptImpl.getDiagnostics().size());
        int i = 5 + 3;
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Event.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass2.capture());
        verifyEventType(forClass2.getAllValues().subList(5, i), TaskEventTASucceeded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), AMSchedulerEventTAEnded.class, 1);
        verifyEventType(forClass2.getAllValues().subList(5, i), DAGEventCounterUpdate.class, 1);
        mockTaskAttemptImpl.handle(new TaskAttemptEventNodeFailed(id, "NodeDecomissioned", TaskAttemptTerminationCause.NODE_FAILED));
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i + 0))).handle((Event) ArgumentCaptor.forClass(Event.class).capture());
        Assert.assertEquals("Task attempt is not in the  SUCCEEDED state", TaskAttemptState.SUCCEEDED, mockTaskAttemptImpl.getState());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        Assert.assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, mockTaskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testMultipleOutputFailed() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezVertexID tezVertexID = TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1);
        TezTaskID tezTaskID = TezTaskID.getInstance(tezVertexID, 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        HistoryEventHandler historyEventHandler = (HistoryEventHandler) Mockito.mock(HistoryEventHandler.class);
        ((AppContext) Mockito.doReturn(historyEventHandler).when(this.appCtx)).getHistoryHandler();
        DAGImpl dAGImpl = (DAGImpl) Mockito.mock(DAGImpl.class);
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(DAGHistoryEvent.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(8))).handle((Event) forClass.capture());
        ((HistoryEventHandler) Mockito.verify(historyEventHandler, Mockito.times(2))).handle((DAGHistoryEvent) forClass2.capture());
        long finishTime = ((DAGHistoryEvent) forClass2.getValue()).getHistoryEvent().getFinishTime();
        verifyEventType(forClass.getAllValues(), TaskEventTAUpdate.class, 2);
        InputReadErrorEvent create = InputReadErrorEvent.create("", 0, 1);
        EventMetaData eventMetaData = (EventMetaData) Mockito.mock(EventMetaData.class);
        TezTaskAttemptID tezTaskAttemptID = (TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class);
        Mockito.when(eventMetaData.getTaskAttemptID()).thenReturn(tezTaskAttemptID);
        TezTaskID tezTaskID2 = (TezTaskID) Mockito.mock(TezTaskID.class);
        TezVertexID tezVertexID2 = (TezVertexID) Mockito.mock(TezVertexID.class);
        Mockito.when(tezTaskAttemptID.getTaskID()).thenReturn(tezTaskID2);
        Mockito.when(tezTaskID2.getVertexID()).thenReturn(tezVertexID2);
        Vertex vertex = (Vertex) Mockito.mock(VertexImpl.class);
        Mockito.when(Integer.valueOf(vertex.getRunningTasks())).thenReturn(11);
        Mockito.when(dAGImpl.getVertex(tezVertexID2)).thenReturn(vertex);
        Mockito.when(this.appCtx.getCurrentDAG()).thenReturn(dAGImpl);
        TezEvent tezEvent = new TezEvent(create, eventMetaData);
        mockTaskAttemptImpl.handle(new TaskAttemptEventOutputFailed(id, tezEvent, 11));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        mockTaskAttemptImpl.handle(new TaskAttemptEventOutputFailed(id, tezEvent, 11));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        Assert.assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, mockTaskAttemptImpl.getTerminationCause());
        TezTaskAttemptID tezTaskAttemptID2 = (TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class);
        Mockito.when(eventMetaData.getTaskAttemptID()).thenReturn(tezTaskAttemptID2);
        TezTaskID tezTaskID3 = (TezTaskID) Mockito.mock(TezTaskID.class);
        TezVertexID tezVertexID3 = (TezVertexID) Mockito.mock(TezVertexID.class);
        Mockito.when(tezTaskAttemptID2.getTaskID()).thenReturn(tezTaskID3);
        Mockito.when(tezTaskID3.getVertexID()).thenReturn(tezVertexID3);
        Vertex vertex2 = (Vertex) Mockito.mock(VertexImpl.class);
        Mockito.when(Integer.valueOf(vertex2.getRunningTasks())).thenReturn(11);
        Mockito.when(dAGImpl.getVertex(tezVertexID3)).thenReturn(vertex2);
        mockTaskAttemptImpl.handle(new TaskAttemptEventOutputFailed(id, tezEvent, 11));
        Assert.assertEquals("Task attempt is not in FAILED state", mockTaskAttemptImpl.getState(), TaskAttemptState.FAILED);
        Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, mockTaskAttemptImpl.getTerminationCause());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id);
        ((HistoryEventHandler) Mockito.verify(historyEventHandler, Mockito.times(3))).handle((DAGHistoryEvent) forClass2.capture());
        TaskAttemptFinishedEvent historyEvent = ((DAGHistoryEvent) forClass2.getValue()).getHistoryEvent();
        Assert.assertEquals(TaskFailureType.NON_FATAL, historyEvent.getTaskFailureType());
        Assert.assertEquals(finishTime, historyEvent.getFinishTime());
        Assert.assertEquals(true, Boolean.valueOf(mockTaskAttemptImpl.inputFailedReported));
        int i = 8 + 2;
        forClass.getAllValues().clear();
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass.capture());
        Assert.assertEquals(TaskFailureType.NON_FATAL, verifyEventType(forClass.getAllValues().subList(8, i), TaskEventTAFailed.class, 1).getTaskFailureType());
        mockTaskAttemptImpl.handle(new TaskAttemptEventOutputFailed(id, tezEvent, 1));
        Assert.assertEquals("Task attempt is not in FAILED state, still", mockTaskAttemptImpl.getState(), TaskAttemptState.FAILED);
        Assert.assertFalse("InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES", mockEventHandler.internalError);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(i))).handle((Event) forClass.capture());
        Configuration configuration2 = new Configuration(this.vertexConf);
        configuration2.setInt("tez.task.max.allowed.output.failures", 1);
        createMockVertex(configuration2);
        MockTaskAttemptImpl mockTaskAttemptImpl2 = new MockTaskAttemptImpl(TezTaskID.getInstance(tezVertexID, 2), 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id2 = mockTaskAttemptImpl2.getID();
        mockTaskAttemptImpl2.handle(new TaskAttemptEventSchedule(id2, 0, 0));
        mockTaskAttemptImpl2.handle(new TaskAttemptEventSubmitted(id2, newInstance5));
        mockTaskAttemptImpl2.handle(new TaskAttemptEventStartedRemotely(id2));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id2);
        mockTaskAttemptImpl2.handle(new TaskAttemptEvent(id2, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl2.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id2);
        InputReadErrorEvent create2 = InputReadErrorEvent.create("", 1, 1);
        EventMetaData eventMetaData2 = (EventMetaData) Mockito.mock(EventMetaData.class);
        TezTaskAttemptID tezTaskAttemptID3 = (TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class);
        Mockito.when(tezTaskAttemptID3.getTaskID()).thenReturn(tezTaskID3);
        Mockito.when(eventMetaData2.getTaskAttemptID()).thenReturn(tezTaskAttemptID3);
        mockTaskAttemptImpl2.handle(new TaskAttemptEventOutputFailed(id2, new TezEvent(create2, eventMetaData2), 8));
        Assert.assertEquals("Task attempt is not in failed state", mockTaskAttemptImpl2.getState(), TaskAttemptState.FAILED);
        Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, mockTaskAttemptImpl2.getTerminationCause());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id2);
        Clock clock = (Clock) Mockito.mock(Clock.class);
        Configuration configuration3 = new Configuration(this.vertexConf);
        configuration3.setInt("tez.task.max.allowed.output.failures", 10);
        configuration3.setInt("tez.am.max.allowed.time-sec.for-read-error", 1);
        createMockVertex(configuration3);
        MockTaskAttemptImpl mockTaskAttemptImpl3 = new MockTaskAttemptImpl(TezTaskID.getInstance(tezVertexID, 3), 1, mockEventHandler, createMockTaskAttemptListener, configuration, clock, taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id3 = mockTaskAttemptImpl3.getID();
        mockTaskAttemptImpl3.handle(new TaskAttemptEventSchedule(id3, 0, 0));
        mockTaskAttemptImpl3.handle(new TaskAttemptEventSubmitted(id3, newInstance5));
        mockTaskAttemptImpl3.handle(new TaskAttemptEventStartedRemotely(id3));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id3);
        mockTaskAttemptImpl3.handle(new TaskAttemptEvent(id3, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl3.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id3);
        InputReadErrorEvent create3 = InputReadErrorEvent.create("", 1, 1);
        EventMetaData eventMetaData3 = (EventMetaData) Mockito.mock(EventMetaData.class);
        TezTaskAttemptID tezTaskAttemptID4 = (TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class);
        Mockito.when(tezTaskAttemptID4.getTaskID()).thenReturn(tezTaskID3);
        Mockito.when(eventMetaData3.getTaskAttemptID()).thenReturn(tezTaskAttemptID4);
        TezEvent tezEvent2 = new TezEvent(create3, eventMetaData3);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(1000L);
        Mockito.when(Integer.valueOf(vertex2.getRunningTasks())).thenReturn(1000);
        mockTaskAttemptImpl3.handle(new TaskAttemptEventOutputFailed(id3, tezEvent2, 1000));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl3.getState(), TaskAttemptState.SUCCEEDED);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(1500L);
        mockTaskAttemptImpl3.handle(new TaskAttemptEventOutputFailed(id3, tezEvent2, 1000));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl3.getState(), TaskAttemptState.SUCCEEDED);
        Mockito.when(Long.valueOf(clock.getTime())).thenReturn(2001L);
        mockTaskAttemptImpl3.handle(new TaskAttemptEventOutputFailed(id3, tezEvent2, 1000));
        Assert.assertEquals("Task attempt is not in FAILED state", mockTaskAttemptImpl3.getState(), TaskAttemptState.FAILED);
        Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, mockTaskAttemptImpl3.getTerminationCause());
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler, Mockito.times(1))).unregister(id3);
    }

    @Test(timeout = 60000)
    public void testTAFailureBasedOnRunningTasks() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        HistoryEventHandler historyEventHandler = (HistoryEventHandler) Mockito.mock(HistoryEventHandler.class);
        ((AppContext) Mockito.doReturn(historyEventHandler).when(this.appCtx)).getHistoryHandler();
        DAGImpl dAGImpl = (DAGImpl) Mockito.mock(DAGImpl.class);
        TaskHeartbeatHandler taskHeartbeatHandler = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), taskHeartbeatHandler, this.appCtx, false, newInstance3, createFakeContainerContext(), false);
        TezTaskAttemptID id = mockTaskAttemptImpl.getID();
        mockTaskAttemptImpl.handle(new TaskAttemptEventSchedule(id, 0, 0));
        mockTaskAttemptImpl.handle(new TaskAttemptEventSubmitted(id, newInstance5));
        mockTaskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(id));
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).register(id);
        mockTaskAttemptImpl.handle(new TaskAttemptEvent(id, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals("Task attempt is not in succeeded state", mockTaskAttemptImpl.getState(), TaskAttemptState.SUCCEEDED);
        ((TaskHeartbeatHandler) Mockito.verify(taskHeartbeatHandler)).unregister(id);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Event.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(DAGHistoryEvent.class);
        ((MockEventHandler) Mockito.verify(mockEventHandler, Mockito.times(8))).handle((Event) forClass.capture());
        ((HistoryEventHandler) Mockito.verify(historyEventHandler, Mockito.times(2))).handle((DAGHistoryEvent) forClass2.capture());
        ((DAGHistoryEvent) forClass2.getValue()).getHistoryEvent().getFinishTime();
        verifyEventType(forClass.getAllValues(), TaskEventTAUpdate.class, 2);
        InputReadErrorEvent create = InputReadErrorEvent.create("", 0, 1);
        EventMetaData eventMetaData = (EventMetaData) Mockito.mock(EventMetaData.class);
        TezTaskAttemptID tezTaskAttemptID = (TezTaskAttemptID) Mockito.mock(TezTaskAttemptID.class);
        Mockito.when(eventMetaData.getTaskAttemptID()).thenReturn(tezTaskAttemptID);
        TezTaskID tezTaskID2 = (TezTaskID) Mockito.mock(TezTaskID.class);
        TezVertexID tezVertexID = (TezVertexID) Mockito.mock(TezVertexID.class);
        Mockito.when(tezTaskAttemptID.getTaskID()).thenReturn(tezTaskID2);
        Mockito.when(tezTaskID2.getVertexID()).thenReturn(tezVertexID);
        Vertex vertex = (Vertex) Mockito.mock(VertexImpl.class);
        Mockito.when(Integer.valueOf(vertex.getRunningTasks())).thenReturn(5);
        Mockito.when(dAGImpl.getVertex(tezVertexID)).thenReturn(vertex);
        Mockito.when(this.appCtx.getCurrentDAG()).thenReturn(dAGImpl);
        mockTaskAttemptImpl.handle(new TaskAttemptEventOutputFailed(id, new TezEvent(create, eventMetaData), 11));
        Assert.assertEquals("Task attempt is not in FAILED state", mockTaskAttemptImpl.getState(), TaskAttemptState.FAILED);
    }

    @Test(timeout = 5000)
    public void testKilledInNew() throws ServicePluginException {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 2);
        ApplicationAttemptId newInstance2 = ApplicationAttemptId.newInstance(newInstance, 0);
        TezTaskID tezTaskID = TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance(newInstance, 1), 1), 1);
        MockEventHandler mockEventHandler = (MockEventHandler) Mockito.spy(new MockEventHandler());
        TaskCommunicatorManagerInterface createMockTaskAttemptListener = createMockTaskAttemptListener();
        Configuration configuration = new Configuration();
        configuration.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
        configuration.setBoolean("fs.file.impl.disable.cache", true);
        this.locationHint = TaskLocationHint.createTaskLocationHint(new HashSet(Arrays.asList("127.0.0.1")), (Set) null);
        Resource newInstance3 = Resource.newInstance(1024, 1);
        NodeId newInstance4 = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance5 = ContainerId.newInstance(newInstance2, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance5);
        Mockito.when(container.getNodeId()).thenReturn(newInstance4);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appCtx);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(new ClusterInfo()).when(this.appCtx)).getClusterInfo();
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appCtx)).getAllContainers();
        MockTaskAttemptImpl mockTaskAttemptImpl = new MockTaskAttemptImpl(tezTaskID, 1, mockEventHandler, createMockTaskAttemptListener, configuration, new SystemClock(), (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class), this.appCtx, false, newInstance3, createFakeContainerContext(), true);
        Assert.assertEquals(TaskAttemptStateInternal.NEW, mockTaskAttemptImpl.getInternalState());
        mockTaskAttemptImpl.handle(new TaskAttemptEventKillRequest(mockTaskAttemptImpl.getID(), "kill it", TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
        Assert.assertEquals(TaskAttemptStateInternal.KILLED, mockTaskAttemptImpl.getInternalState());
        Assert.assertEquals(0L, mockTaskAttemptImpl.taskAttemptStartedEventLogged);
        Assert.assertEquals(1L, mockTaskAttemptImpl.taskAttemptFinishedEventLogged);
    }

    private Event verifyEventType(List<Event> list, Class<? extends Event> cls, int i) {
        int i2 = 0;
        Event event = null;
        for (Event event2 : list) {
            if (cls.isInstance(event2)) {
                i2++;
                event = event2;
            }
        }
        Assert.assertEquals("Mismatch in num occurences of event: " + cls.getCanonicalName(), i, i2);
        return event;
    }

    private static ContainerContext createFakeContainerContext() {
        return new ContainerContext(new HashMap(), new Credentials(), new HashMap(), "");
    }

    private TaskCommunicatorManagerInterface createMockTaskAttemptListener() throws ServicePluginException {
        TaskCommunicatorManagerInterface taskCommunicatorManagerInterface = (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class);
        TaskCommunicator taskCommunicator = (TaskCommunicator) Mockito.mock(TaskCommunicator.class);
        ((TaskCommunicator) Mockito.doReturn(new InetSocketAddress("localhost", 0)).when(taskCommunicator)).getAddress();
        ((TaskCommunicatorManagerInterface) Mockito.doReturn(new TaskCommunicatorWrapper(taskCommunicator)).when(taskCommunicatorManagerInterface)).getTaskCommunicator(0);
        return taskCommunicatorManagerInterface;
    }
}
