/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TestVertexRecovery {
    private static final Log LOG = LogFactory.getLog(TestVertexRecovery.class);
    private DrainDispatcher dispatcher;
    private AppContext mockAppContext;
    private ApplicationId appId = ApplicationId.newInstance((long)System.currentTimeMillis(), (int)1);
    private DAGImpl dag;
    private TezDAGID dagId = TezDAGID.getInstance((ApplicationId)this.appId, (int)1);
    private String user = "user";
    private long initRequestedTime = 100L;
    private long initedTime = this.initRequestedTime + 100L;
    private VertexEventHanlder vertexEventHandler;
    private TaskEventHandler taskEventHandler;

    private DAGProtos.DAGPlan createDAGPlan() {
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output").build()).setName("outputx").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output").build()).setName("outputx").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    @Before
    public void setUp() throws IOException {
        this.dispatcher = new DrainDispatcher();
        this.dispatcher.register(DAGEventType.class, (EventHandler)Mockito.mock(EventHandler.class));
        this.vertexEventHandler = new VertexEventHanlder();
        this.dispatcher.register(VertexEventType.class, (EventHandler)this.vertexEventHandler);
        this.taskEventHandler = new TaskEventHandler();
        this.dispatcher.register(TaskEventType.class, (EventHandler)this.taskEventHandler);
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)new TaskAttemptEventHandler());
        this.dispatcher.init(new Configuration());
        this.dispatcher.start();
        this.mockAppContext = (AppContext)Mockito.mock(AppContext.class, (Answer)Mockito.RETURNS_DEEP_STUBS);
        DAGProtos.DAGPlan dagPlan = this.createDAGPlan();
        this.dag = new DAGImpl(this.dagId, new Configuration(), dagPlan, this.dispatcher.getEventHandler(), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), new Credentials(), (Clock)new SystemClock(), this.user, (TaskHeartbeatHandler)Mockito.mock(TaskHeartbeatHandler.class), this.mockAppContext);
        Mockito.when((Object)this.mockAppContext.getCurrentDAG()).thenReturn((Object)this.dag);
        this.dag.handle(new DAGEvent(this.dagId, DAGEventType.DAG_INIT));
        LOG.info((Object)"finish setUp");
    }

    @Test
    public void testRecovery_Desired_SUCCEEDED() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertex1.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)vertex1.getState());
        Assert.assertEquals((long)vertex1.numTasks, (long)vertex1.succeededTaskCount);
        Assert.assertEquals((long)vertex1.numTasks, (long)vertex1.completedTaskCount);
        this.assertTaskRecoveredEventSent(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.NEW, (Object)vertex3.getState());
        Assert.assertEquals((long)0L, (long)this.vertexEventHandler.getEvents().size());
    }

    @Test
    public void testRecovery_Desired_FAILED() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertex1.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)vertex1.getState());
        Assert.assertEquals((long)vertex1.numTasks, (long)vertex1.failedTaskCount);
        Assert.assertEquals((long)0L, (long)vertex1.completedTaskCount);
        this.assertTaskRecoveredEventSent(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.NEW, (Object)vertex3.getState());
        Assert.assertEquals((long)0L, (long)this.vertexEventHandler.getEvents().size());
    }

    @Test
    public void testRecovery_Desired_KILLED() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertex1.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.KILLED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)vertex1.getState());
        Assert.assertEquals((long)vertex1.numTasks, (long)vertex1.killedTaskCount);
        Assert.assertEquals((long)0L, (long)vertex1.completedTaskCount);
        this.assertTaskRecoveredEventSent(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.NEW, (Object)vertex3.getState());
        Assert.assertEquals((long)0L, (long)this.vertexEventHandler.getEvents().size());
    }

    @Test
    public void testRecovery_Desired_ERROR() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex1.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, vertex1.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.ERROR, (Object)vertex1.getState());
        Assert.assertEquals((long)vertex1.numTasks, (long)vertex1.failedTaskCount);
        Assert.assertEquals((long)0L, (long)vertex1.completedTaskCount);
        this.assertTaskRecoveredEventSent(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.NEW, (Object)vertex3.getState());
        Assert.assertEquals((long)0L, (long)this.vertexEventHandler.getEvents().size());
    }

    private TezEvent createTezEvent() {
        return new TezEvent((Event)InputDataInformationEvent.createWithSerializedPayload((int)0, (ByteBuffer)ByteBuffer.allocate(0)), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", null, null));
    }

    @Test
    public void testRecovery_New_Desired_RUNNING() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexRecoverableEventsGeneratedEvent(vertex1.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{this.createTezEvent()})));
        Assert.assertEquals((Object)VertexState.NEW, (Object)recoveredState);
        Assert.assertEquals((long)1L, (long)vertex1.recoveredEvents.size());
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)vertex1.recoveredEvents.size());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
    }

    private void assertTaskRecoveredEventSent(VertexImpl vertex) {
        int sentNum = 0;
        for (TaskEvent event : this.taskEventHandler.getEvents()) {
            TaskEventRecoverTask recoverEvent;
            if (event.getType() != TaskEventType.T_RECOVER || !(recoverEvent = (TaskEventRecoverTask)event).getTaskID().getVertexID().equals((Object)vertex.getVertexId())) continue;
            ++sentNum;
        }
        Assert.assertEquals((String)("expect " + vertex.getTotalTasks() + " TaskEventTaskRecover sent for vertex:" + vertex.getVertexId() + "but actuall sent " + sentNum), (long)vertex.getTotalTasks(), (long)sentNum);
    }

    private void assertOutputCommitters(VertexImpl vertex) {
        Assert.assertTrue((vertex.getOutputCommitters() != null ? 1 : 0) != 0);
        for (OutputCommitter c : vertex.getOutputCommitters().values()) {
            TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
            Assert.assertEquals((long)0L, (long)committer.abortCounter);
            Assert.assertEquals((long)0L, (long)committer.commitCounter);
            Assert.assertEquals((long)1L, (long)committer.initCounter);
            Assert.assertEquals((long)1L, (long)committer.setupCounter);
        }
    }

    private void restoreFromInitializedEvent(VertexImpl vertex) {
        long initTimeRequested = 100L;
        long initedTime = initTimeRequested + 100L;
        VertexState recoveredState = vertex.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex.getVertexId(), "vertex1", initTimeRequested, initedTime, vertex.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        Assert.assertEquals((long)vertex.getTotalTasks(), (long)vertex.getTasks().size());
        Assert.assertEquals((long)initTimeRequested, (long)vertex.initTimeRequested);
        Assert.assertEquals((long)initedTime, (long)vertex.initedTime);
    }

    @Test
    public void testRecovery_Inited_Desired_RUNNING() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        this.restoreFromInitializedEvent(vertex1);
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexRecoverableEventsGeneratedEvent(vertex1.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{this.createTezEvent()})));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)vertex1.recoveredEvents.size());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        this.assertTaskRecoveredEventSent(vertex1);
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
    }

    @Test
    public void testRecovery_Started_Desired_RUNNING() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        this.restoreFromInitializedEvent(vertex1);
        long startTimeRequested = this.initedTime + 100L;
        long startedTime = startTimeRequested + 100L;
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex1.getVertexId(), startTimeRequested, startedTime));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        Assert.assertEquals((long)startTimeRequested, (long)vertex1.startTimeRequested);
        Assert.assertEquals((long)startedTime, (long)vertex1.startedTime);
        recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexRecoverableEventsGeneratedEvent(vertex1.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{this.createTezEvent()})));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        Assert.assertEquals((long)1L, (long)vertex1.recoveredEvents.size());
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)vertex1.recoveredEvents.size());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        this.assertTaskRecoveredEventSent(vertex1);
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
    }

    @Test
    public void testRecovery_Finished_Desired_RUNNING() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        this.restoreFromInitializedEvent(vertex1);
        long startRequestedTime = this.initedTime + 100L;
        long startTime = startRequestedTime + 100L;
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex1.getVertexId(), startRequestedTime, startTime));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        long finishTime = startTime + 100L;
        recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexFinishedEvent(vertex1.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, startRequestedTime, startTime, finishTime, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
        Assert.assertEquals((long)finishTime, (long)vertex1.finishTime);
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)recoveredState);
        Assert.assertEquals((Object)false, (Object)vertex1.recoveryCommitInProgress);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)vertex1.recoveredEvents.size());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        this.assertTaskRecoveredEventSent(vertex1);
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
    }

    @Test
    public void testRecovery_RecoveringFromNew() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        Assert.assertEquals((long)1L, (long)vertex1.getTasks().size());
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        VertexState recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexRecoverableEventsGeneratedEvent(vertex3.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{this.createTezEvent()})));
        Assert.assertEquals((Object)VertexState.NEW, (Object)recoveredState);
        Assert.assertEquals((long)1L, (long)vertex3.recoveredEvents.size());
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        VertexImpl vertex2 = (VertexImpl)this.dag.getVertex("vertex2");
        vertex2.handle((VertexEvent)new VertexEventRecoverVertex(vertex2.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex2.getState());
        Assert.assertNull((Object)vertex2.getOutputCommitters());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex3.getState());
        Assert.assertEquals((long)2L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        Assert.assertEquals((long)0L, (long)vertex3.recoveredEvents.size());
        this.assertOutputCommitters(vertex3);
    }

    @Test
    public void testRecovery_VertexManagerErrorOnRecovery() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        this.restoreFromInitializedEvent(vertex1);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        Assert.assertEquals((long)vertex1.getTotalTasks(), (long)vertex1.getTasks().size());
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        VertexState recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex3.getVertexId(), "vertex3", this.initRequestedTime, this.initedTime, 0, "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        vertex3.handle((VertexEvent)new VertexEventManagerUserCodeError(vertex3.getVertexId(), new AMUserCodeException(AMUserCodeException.Source.VertexManager, (Throwable)new TezUncheckedException("test"))));
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        VertexImpl vertex2 = (VertexImpl)this.dag.getVertex("vertex2");
        this.restoreFromInitializedEvent(vertex2);
        vertex2.handle((VertexEvent)new VertexEventRecoverVertex(vertex2.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex2.getState());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)vertex3.getState());
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)vertex3.getTerminationCause());
        Assert.assertEquals((long)2L, (long)vertex3.numRecoveredSourceVertices);
    }

    @Test
    public void testRecovery_RecoveringFromInited() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        this.restoreFromInitializedEvent(vertex1);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        Assert.assertEquals((long)vertex1.getTotalTasks(), (long)vertex1.getTasks().size());
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        VertexState recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexRecoverableEventsGeneratedEvent(vertex3.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{this.createTezEvent()})));
        Assert.assertEquals((Object)VertexState.NEW, (Object)recoveredState);
        Assert.assertEquals((long)1L, (long)vertex3.recoveredEvents.size());
        recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex3.getVertexId(), "vertex3", this.initRequestedTime, this.initedTime, 2, "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        VertexImpl vertex2 = (VertexImpl)this.dag.getVertex("vertex2");
        this.restoreFromInitializedEvent(vertex2);
        vertex2.handle((VertexEvent)new VertexEventRecoverVertex(vertex2.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex2.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex3.getState());
        Assert.assertEquals((long)2L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        Assert.assertEquals((long)0L, (long)vertex3.recoveredEvents.size());
        this.assertOutputCommitters(vertex3);
        this.assertTaskRecoveredEventSent(vertex1);
        this.assertTaskRecoveredEventSent(vertex2);
        this.assertTaskRecoveredEventSent(vertex3);
    }

    @Test
    public void testRecovery_RecoveringFromRunning() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        this.restoreFromInitializedEvent(vertex1);
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex1.getVertexId(), this.initRequestedTime + 100L, this.initRequestedTime + 200L));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        Assert.assertEquals((long)1L, (long)vertex1.getTasks().size());
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexRecoverableEventsGeneratedEvent(vertex3.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{this.createTezEvent()})));
        Assert.assertEquals((Object)VertexState.NEW, (Object)recoveredState);
        Assert.assertEquals((long)1L, (long)vertex3.recoveredEvents.size());
        recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex3.getVertexId(), "vertex3", this.initRequestedTime, this.initedTime, vertex3.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex3.getVertexId(), this.initRequestedTime + 100L, this.initRequestedTime + 200L));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        VertexImpl vertex2 = (VertexImpl)this.dag.getVertex("vertex2");
        recoveredState = vertex2.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex2.getVertexId(), "vertex2", this.initRequestedTime, this.initedTime, vertex2.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        recoveredState = vertex2.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex2.getVertexId(), this.initRequestedTime + 100L, this.initRequestedTime + 200L));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        vertex2.handle((VertexEvent)new VertexEventRecoverVertex(vertex2.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex2.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex3.getState());
        Assert.assertEquals((long)2L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        Assert.assertEquals((long)0L, (long)vertex3.recoveredEvents.size());
        this.assertOutputCommitters(vertex3);
        this.assertTaskRecoveredEventSent(vertex1);
        this.assertTaskRecoveredEventSent(vertex2);
        this.assertTaskRecoveredEventSent(vertex3);
    }

    @Test
    public void testRecovery_RecoveringFromSUCCEEDED() {
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex("vertex1");
        this.restoreFromInitializedEvent(vertex1);
        VertexState recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex1.getVertexId(), this.initRequestedTime + 100L, this.initRequestedTime + 200L));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        recoveredState = vertex1.restoreFromEvent((HistoryEvent)new VertexFinishedEvent(vertex1.getVertexId(), "vertex1", this.initRequestedTime, this.initedTime, this.initRequestedTime + 300L, this.initRequestedTime + 400L, this.initRequestedTime + 500L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)recoveredState);
        vertex1.handle((VertexEvent)new VertexEventRecoverVertex(vertex1.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        Assert.assertEquals((long)1L, (long)vertex1.getTasks().size());
        this.assertOutputCommitters(vertex1);
        VertexImpl vertex3 = (VertexImpl)this.dag.getVertex("vertex3");
        recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexRecoverableEventsGeneratedEvent(vertex3.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{this.createTezEvent()})));
        Assert.assertEquals((Object)VertexState.NEW, (Object)recoveredState);
        Assert.assertEquals((long)1L, (long)vertex3.recoveredEvents.size());
        this.restoreFromInitializedEvent(vertex3);
        recoveredState = vertex3.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex3.getVertexId(), this.initRequestedTime + 100L, this.initRequestedTime + 200L));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        Assert.assertEquals((Object)VertexState.RECOVERING, (Object)vertex3.getState());
        Assert.assertEquals((long)1L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        VertexImpl vertex2 = (VertexImpl)this.dag.getVertex("vertex2");
        recoveredState = vertex2.restoreFromEvent((HistoryEvent)new VertexInitializedEvent(vertex2.getVertexId(), "vertex2", this.initRequestedTime, this.initedTime, vertex2.getTotalTasks(), "", null));
        Assert.assertEquals((Object)VertexState.INITED, (Object)recoveredState);
        recoveredState = vertex2.restoreFromEvent((HistoryEvent)new VertexStartedEvent(vertex2.getVertexId(), this.initRequestedTime + 100L, this.initRequestedTime + 200L));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)recoveredState);
        vertex2.handle((VertexEvent)new VertexEventRecoverVertex(vertex2.getVertexId(), VertexState.RUNNING));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex2.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex3.getState());
        Assert.assertEquals((long)2L, (long)vertex3.numRecoveredSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numInitedSourceVertices);
        Assert.assertEquals((long)2L, (long)vertex3.numStartedSourceVertices);
        Assert.assertEquals((long)1L, (long)vertex3.getDistanceFromRoot());
        Assert.assertEquals((long)0L, (long)vertex3.recoveredEvents.size());
        this.assertOutputCommitters(vertex3);
        this.assertTaskRecoveredEventSent(vertex1);
        this.assertTaskRecoveredEventSent(vertex2);
        this.assertTaskRecoveredEventSent(vertex3);
    }

    class TaskAttemptEventHandler
    implements EventHandler<TaskAttemptEvent> {
        TaskAttemptEventHandler() {
        }

        public void handle(TaskAttemptEvent event) {
        }
    }

    class TaskEventHandler
    implements EventHandler<TaskEvent> {
        private List<TaskEvent> events = new ArrayList<TaskEvent>();

        TaskEventHandler() {
        }

        public void handle(TaskEvent event) {
            this.events.add(event);
            ((TaskImpl)TestVertexRecovery.this.dag.getVertex(event.getTaskID().getVertexID()).getTask(event.getTaskID())).handle(event);
        }

        public List<TaskEvent> getEvents() {
            return this.events;
        }
    }

    class VertexEventHanlder
    implements EventHandler<VertexEvent> {
        private List<VertexEvent> events = new ArrayList<VertexEvent>();

        VertexEventHanlder() {
        }

        public void handle(VertexEvent event) {
            this.events.add(event);
            ((VertexImpl)TestVertexRecovery.this.dag.getVertex(event.getVertexId())).handle(event);
        }

        public List<VertexEvent> getEvents() {
            return this.events;
        }
    }
}

