package org.apache.tez.dag.app.dag.impl;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.TezAbstractEvent;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.MockClock;
import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TestStateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.test.EdgeManagerForTest;
import org.apache.tez.test.GraceShuffleVertexManagerForTest;
import org.apache.tez.test.VertexManagerPluginForTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl.class */
public class TestVertexImpl {
    private static final Logger LOG = LoggerFactory.getLogger(TestVertexImpl.class);
    private ListeningExecutorService execService;
    private TezDAGID dagId;
    private ApplicationAttemptId appAttemptId;
    private DAGProtos.DAGPlan dagPlan;
    private DAGProtos.DAGPlan invalidDagPlan;
    private Map<String, VertexImpl> vertices;
    private Map<TezVertexID, VertexImpl> vertexIdMap;
    private DrainDispatcher dispatcher;
    private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private TaskHeartbeatHandler thh;
    private AppContext appContext;
    private Configuration conf;
    private Map<String, Edge> edges;
    private Map<String, DAGImpl.VertexGroupInfo> vertexGroups;
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private AMSchedulerEventDispatcher amSchedulerEventDispatcher;
    private HistoryEventHandler historyEventHandler;
    private TestStateChangeNotifier.StateChangeNotifierForTest updateTracker;
    private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;
    private boolean useCustomInitializer = false;
    private InputInitializer customInitializer = null;
    private Clock clock = new SystemClock();
    private VertexLocationHint vertexLocationHint = null;
    private byte[] edgePayload = "EP".getBytes();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$AMSchedulerEventDispatcher.class */
    public class AMSchedulerEventDispatcher implements EventHandler<AMSchedulerEvent> {
        List<AMSchedulerEvent> events;

        private AMSchedulerEventDispatcher() {
            this.events = new ArrayList();
        }

        public void handle(AMSchedulerEvent aMSchedulerEvent) {
            this.events.add(aMSchedulerEvent);
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$ContextSettableInputInitialzier.class */
    private interface ContextSettableInputInitialzier {
        void setContext(InputInitializerContext inputInitializerContext);
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$CountingOutputCommitter.class */
    public static class CountingOutputCommitter extends OutputCommitter {
        public int initCounter;
        public int setupCounter;
        public int commitCounter;
        public int abortCounter;
        private boolean throwError;
        private boolean throwErrorOnAbort;
        private boolean throwRuntimeException;

        /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$CountingOutputCommitter$CountingOutputCommitterConfig.class */
        public static class CountingOutputCommitterConfig implements Writable {
            boolean throwError;
            boolean throwErrorOnAbort;
            boolean throwRuntimeException;

            public CountingOutputCommitterConfig() {
                this.throwError = false;
                this.throwErrorOnAbort = false;
                this.throwRuntimeException = false;
            }

            public CountingOutputCommitterConfig(boolean z, boolean z2, boolean z3) {
                this.throwError = false;
                this.throwErrorOnAbort = false;
                this.throwRuntimeException = false;
                this.throwError = z;
                this.throwErrorOnAbort = z2;
                this.throwRuntimeException = z3;
            }

            public CountingOutputCommitterConfig(UserPayload userPayload) throws IOException {
                this.throwError = false;
                this.throwErrorOnAbort = false;
                this.throwRuntimeException = false;
                DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
                dataInputByteBuffer.reset(new ByteBuffer[]{userPayload.getPayload()});
                readFields(dataInputByteBuffer);
            }

            public void write(DataOutput dataOutput) throws IOException {
                dataOutput.writeBoolean(this.throwError);
                dataOutput.writeBoolean(this.throwErrorOnAbort);
                dataOutput.writeBoolean(this.throwRuntimeException);
            }

            public void readFields(DataInput dataInput) throws IOException {
                this.throwError = dataInput.readBoolean();
                this.throwErrorOnAbort = dataInput.readBoolean();
                this.throwRuntimeException = dataInput.readBoolean();
            }

            public byte[] toUserPayload() throws IOException {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                write(new DataOutputStream(byteArrayOutputStream));
                return byteArrayOutputStream.toByteArray();
            }
        }

        public CountingOutputCommitter(OutputCommitterContext outputCommitterContext) {
            super(outputCommitterContext);
            this.initCounter = 0;
            this.setupCounter = 0;
            this.commitCounter = 0;
            this.abortCounter = 0;
            this.throwError = false;
            this.throwErrorOnAbort = false;
            this.throwRuntimeException = false;
        }

        public void initialize() throws IOException {
            if (getContext().getUserPayload() != null && getContext().getUserPayload().hasPayload()) {
                CountingOutputCommitterConfig countingOutputCommitterConfig = new CountingOutputCommitterConfig(getContext().getUserPayload());
                this.throwError = countingOutputCommitterConfig.throwError;
                this.throwErrorOnAbort = countingOutputCommitterConfig.throwErrorOnAbort;
                this.throwRuntimeException = countingOutputCommitterConfig.throwRuntimeException;
            }
            this.initCounter++;
        }

        public void setupOutput() throws IOException {
            this.setupCounter++;
        }

        public void commitOutput() throws IOException {
            this.commitCounter++;
            if (this.throwError) {
                if (!this.throwRuntimeException) {
                    throw new IOException("I can throwz exceptions in commit");
                }
                throw new RuntimeException("I can throwz exceptions in commit");
            }
        }

        public void abortOutput(VertexStatus.State state) throws IOException {
            this.abortCounter++;
            if (this.throwErrorOnAbort) {
                if (!this.throwRuntimeException) {
                    throw new IOException("I can throwz exceptions in abort");
                }
                throw new RuntimeException("I can throwz exceptions in abort");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$DagEventDispatcher.class */
    public class DagEventDispatcher implements EventHandler<DAGEvent> {
        public Map<DAGEventType, Integer> eventCount;

        private DagEventDispatcher() {
            this.eventCount = new HashMap();
        }

        public void handle(DAGEvent dAGEvent) {
            int i = 1;
            if (this.eventCount.containsKey(dAGEvent.getType())) {
                i = this.eventCount.get(dAGEvent.getType()).intValue() + 1;
            }
            this.eventCount.put(dAGEvent.getType(), Integer.valueOf(i));
        }
    }

    @InterfaceAudience.Private
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$EventHandlingRootInputInitializer.class */
    public static class EventHandlingRootInputInitializer extends InputInitializer implements ContextSettableInputInitialzier {
        final AtomicBoolean initStarted;
        final AtomicBoolean eventReceived;
        final AtomicBoolean initComplete;
        private final ReentrantLock lock;
        private final Condition eventCondition;
        private final List<VertexStateUpdate> stateUpdates;
        private int numExpectedVertexStateUpdate;
        private Object waitForVertexStateUpdate;
        private final List<InputInitializerEvent> initializerEvents;
        private volatile InputInitializerContext context;
        private volatile int numExpectedEvents;
        private IIExceptionLocation exLocation;

        public EventHandlingRootInputInitializer(InputInitializerContext inputInitializerContext) {
            super(inputInitializerContext);
            this.initStarted = new AtomicBoolean(false);
            this.eventReceived = new AtomicBoolean(false);
            this.initComplete = new AtomicBoolean(false);
            this.lock = new ReentrantLock();
            this.eventCondition = this.lock.newCondition();
            this.stateUpdates = new LinkedList();
            this.numExpectedVertexStateUpdate = 1;
            this.waitForVertexStateUpdate = new Object();
            this.initializerEvents = new LinkedList();
            this.numExpectedEvents = 1;
            this.exLocation = null;
        }

        public EventHandlingRootInputInitializer(InputInitializerContext inputInitializerContext, IIExceptionLocation iIExceptionLocation) {
            super(inputInitializerContext);
            this.initStarted = new AtomicBoolean(false);
            this.eventReceived = new AtomicBoolean(false);
            this.initComplete = new AtomicBoolean(false);
            this.lock = new ReentrantLock();
            this.eventCondition = this.lock.newCondition();
            this.stateUpdates = new LinkedList();
            this.numExpectedVertexStateUpdate = 1;
            this.waitForVertexStateUpdate = new Object();
            this.initializerEvents = new LinkedList();
            this.numExpectedEvents = 1;
            this.exLocation = null;
            this.exLocation = iIExceptionLocation;
        }

        public List<Event> initialize() throws Exception {
            this.context.registerForVertexStateUpdates("vertex1", (Set) null);
            this.initStarted.set(true);
            if (this.exLocation == IIExceptionLocation.Initialize) {
                throw new Exception(this.exLocation.name());
            }
            this.lock.lock();
            try {
                if (!this.eventReceived.get()) {
                    this.eventCondition.await();
                }
                this.initComplete.set(true);
                InputDataInformationEvent createWithSerializedPayload = InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap(new byte[]{0}));
                LinkedList linkedList = new LinkedList();
                linkedList.add(createWithSerializedPayload);
                return linkedList;
            } finally {
                this.lock.unlock();
            }
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> list) throws Exception {
            if (this.exLocation == IIExceptionLocation.HandleInputInitializerEvent) {
                throw new Exception(this.exLocation.name());
            }
            this.initializerEvents.addAll(list);
            if (this.initializerEvents.size() == this.numExpectedEvents) {
                this.eventReceived.set(true);
                this.lock.lock();
                try {
                    this.eventCondition.signal();
                } finally {
                    this.lock.unlock();
                }
            }
        }

        @Override // org.apache.tez.dag.app.dag.impl.TestVertexImpl.ContextSettableInputInitialzier
        public void setContext(InputInitializerContext inputInitializerContext) {
            this.context = inputInitializerContext;
        }

        public void setNumExpectedEvents(int i) {
            this.numExpectedEvents = i;
        }

        public void setNumVertexStateUpdateEvents(int i) {
            this.numExpectedVertexStateUpdate = i;
        }

        public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
            if (this.exLocation == IIExceptionLocation.OnVertexStateUpdated) {
                throw new RuntimeException(this.exLocation.name());
            }
            this.stateUpdates.add(vertexStateUpdate);
            if (this.stateUpdates.size() == this.numExpectedVertexStateUpdate) {
                synchronized (this.waitForVertexStateUpdate) {
                    this.waitForVertexStateUpdate.notify();
                }
            }
        }

        public void waitForVertexStateUpdate() throws InterruptedException {
            if (this.stateUpdates.size() < this.numExpectedVertexStateUpdate) {
                synchronized (this.waitForVertexStateUpdate) {
                    this.waitForVertexStateUpdate.wait();
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$IIExceptionLocation.class */
    public enum IIExceptionLocation {
        Initialize,
        HandleInputInitializerEvent,
        OnVertexStateUpdated
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$InvocationCountingVertexManager.class */
    public static class InvocationCountingVertexManager extends VertexManagerPlugin {
        static final AtomicInteger numVmEventsReceived = new AtomicInteger(0);
        static final AtomicInteger numInitializedInputs = new AtomicInteger(0);

        public InvocationCountingVertexManager(VertexManagerPluginContext vertexManagerPluginContext) {
            super(vertexManagerPluginContext);
        }

        public void initialize() throws Exception {
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) throws Exception {
            numVmEventsReceived.incrementAndGet();
        }

        public void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) throws Exception {
            numInitializedInputs.incrementAndGet();
        }
    }

    @InterfaceAudience.Private
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$RootInitializerSettingParallelismTo0.class */
    public static class RootInitializerSettingParallelismTo0 extends InputInitializer {
        private final ReentrantLock lock;
        private final Condition condition;

        public RootInitializerSettingParallelismTo0(InputInitializerContext inputInitializerContext) {
            super(inputInitializerContext);
            this.lock = new ReentrantLock();
            this.condition = this.lock.newCondition();
        }

        public List<Event> initialize() throws Exception {
            InputConfigureVertexTasksEvent create = InputConfigureVertexTasksEvent.create(0, (VertexLocationHint) null, InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
            LinkedList linkedList = new LinkedList();
            linkedList.add(create);
            this.lock.lock();
            try {
                this.condition.await();
                TestVertexImpl.LOG.info("Received signal to proceed. Returning event to set parallelism to 0");
                return linkedList;
            } finally {
                this.lock.unlock();
            }
        }

        public void go() {
            this.lock.lock();
            try {
                TestVertexImpl.LOG.info("Signallying initializer to proceed");
                this.condition.signal();
            } finally {
                this.lock.unlock();
            }
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> list) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$RootInputInitializerManagerControlled.class */
    public static class RootInputInitializerManagerControlled extends RootInputInitializerManager {
        private List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs;
        private final EventHandler eventHandler;
        private final DrainDispatcher dispatcher;
        private final TezVertexID vertexID;
        private volatile boolean hasShutDown;

        public RootInputInitializerManagerControlled(Vertex vertex, AppContext appContext, EventHandler eventHandler, DrainDispatcher drainDispatcher, StateChangeNotifier stateChangeNotifier) throws IOException {
            super(vertex, appContext, UserGroupInformation.getCurrentUser(), stateChangeNotifier);
            this.hasShutDown = false;
            this.eventHandler = eventHandler;
            this.dispatcher = drainDispatcher;
            this.vertexID = vertex.getVertexId();
        }

        public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> list) {
            this.inputs = list;
        }

        protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInputLeafOutput, InputInitializerContext inputInitializerContext) {
            return new InputInitializer(inputInitializerContext) { // from class: org.apache.tez.dag.app.dag.impl.TestVertexImpl.RootInputInitializerManagerControlled.1
                public List<Event> initialize() throws Exception {
                    return null;
                }

                public void handleInputInitializerEvent(List<InputInitializerEvent> list) throws Exception {
                }
            };
        }

        public void shutdown() {
            this.hasShutDown = true;
        }

        public void failInputInitialization() throws TezException {
            super.runInputInitializers(this.inputs);
            this.eventHandler.handle(new VertexEventRootInputFailed(this.vertexID, this.inputs.get(0).getName(), new AMUserCodeException(AMUserCodeException.Source.InputInitializer, new RuntimeException("MockInitializerFailed"))));
            this.dispatcher.await();
        }

        public void completeInputInitialization() {
            this.eventHandler.handle(new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(0).getName(), (List) null));
            this.dispatcher.await();
        }

        public void completeInputDistribution(byte[] bArr) {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(1);
            newArrayListWithCapacity.add(InputUpdatePayloadEvent.create(ByteBuffer.wrap(bArr)));
            this.eventHandler.handle(new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(0).getName(), newArrayListWithCapacity));
            this.dispatcher.await();
        }

        public void completeInputInitialization(int i) {
            this.eventHandler.handle(new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(i).getName(), (List) null));
            this.dispatcher.await();
        }

        public void completeInputInitialization(int i, int i2, List<TaskLocationHint> list) {
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i2 + 1);
            newArrayListWithCapacity.add(InputConfigureVertexTasksEvent.create(i2, VertexLocationHint.create(list), (InputSpecUpdate) null));
            for (int i3 = 0; i3 < i2; i3++) {
                newArrayListWithCapacity.add(InputDataInformationEvent.createWithSerializedPayload(i3, (ByteBuffer) null));
            }
            this.eventHandler.handle(new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(i).getName(), newArrayListWithCapacity));
            this.dispatcher.await();
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$RootInputInitializerManagerWithRunningInitializer.class */
    private static class RootInputInitializerManagerWithRunningInitializer extends RootInputInitializerManager {
        private final InputInitializer presetInitializer;

        public RootInputInitializerManagerWithRunningInitializer(Vertex vertex, AppContext appContext, InputInitializer inputInitializer, StateChangeNotifier stateChangeNotifier) throws IOException {
            super(vertex, appContext, UserGroupInformation.getCurrentUser(), stateChangeNotifier);
            this.presetInitializer = inputInitializer;
        }

        protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> rootInputLeafOutput, InputInitializerContext inputInitializerContext) {
            if (this.presetInitializer instanceof ContextSettableInputInitialzier) {
                this.presetInitializer.setContext(inputInitializerContext);
            }
            return this.presetInitializer;
        }
    }

    @InterfaceAudience.Private
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$RootInputSpecUpdaterVertexManager.class */
    public static class RootInputSpecUpdaterVertexManager extends VertexManagerPlugin {
        private static final int NUM_TASKS = 5;

        public RootInputSpecUpdaterVertexManager(VertexManagerPluginContext vertexManagerPluginContext) {
            super(vertexManagerPluginContext);
        }

        public void initialize() {
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> list) {
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier taskAttemptIdentifier) {
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
        }

        public void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
            HashMap hashMap = new HashMap();
            if (getContext().getUserPayload().deepCopyAsArray()[0] == 0) {
                hashMap.put("input3", InputSpecUpdate.createAllTaskInputSpecUpdate(4));
            } else {
                LinkedList linkedList = new LinkedList();
                for (int i = 1; i <= NUM_TASKS; i++) {
                    linkedList.add(Integer.valueOf(i));
                }
                hashMap.put("input4", InputSpecUpdate.createPerTaskInputSpecUpdate(linkedList));
            }
            getContext().reconfigureVertex(hashMap, (VertexLocationHint) null, NUM_TASKS);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$TaskAttemptEventDispatcher.class */
    public class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent taskAttemptEvent) {
            ((VertexImpl) TestVertexImpl.this.vertexIdMap.get(taskAttemptEvent.getTaskAttemptID().getTaskID().getVertexID())).getTask(taskAttemptEvent.getTaskAttemptID().getTaskID()).getAttempt(taskAttemptEvent.getTaskAttemptID()).handle(taskAttemptEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$TaskEventDispatcher.class */
    public class TaskEventDispatcher implements EventHandler<TaskEvent> {
        List<TaskEvent> events;

        private TaskEventDispatcher() {
            this.events = Lists.newArrayList();
        }

        public void handle(TaskEvent taskEvent) {
            this.events.add(taskEvent);
            VertexImpl vertexImpl = (VertexImpl) TestVertexImpl.this.vertexIdMap.get(taskEvent.getTaskID().getVertexID());
            EventHandler task = vertexImpl.getTask(taskEvent.getTaskID());
            if (task != null) {
                task.handle(taskEvent);
            } else {
                TestVertexImpl.LOG.warn("Task null for vertex: " + vertexImpl.getName() + " taskId: " + taskEvent.getTaskID() + ". Please check if this is important for the test");
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$TestUpdateListener.class */
    class TestUpdateListener implements VertexStateUpdateListener {
        List<VertexStateUpdate> events = Lists.newLinkedList();

        TestUpdateListener() {
        }

        public void onStateUpdated(VertexStateUpdate vertexStateUpdate) {
            this.events.add(vertexStateUpdate);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$VertexEventDispatcher.class */
    public class VertexEventDispatcher implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent vertexEvent) {
            ((VertexImpl) TestVertexImpl.this.vertexIdMap.get(vertexEvent.getVertexId())).handle(vertexEvent);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$VertexImplWithControlledInitializerManager.class */
    public static class VertexImplWithControlledInitializerManager extends VertexImpl {
        private final DrainDispatcher dispatcher;
        private RootInputInitializerManagerControlled rootInputInitializerManager;

        public VertexImplWithControlledInitializerManager(TezVertexID tezVertexID, DAGProtos.VertexPlan vertexPlan, String str, Configuration configuration, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher drainDispatcher, StateChangeNotifier stateChangeNotifier, Configuration configuration2) {
            super(tezVertexID, vertexPlan, str, configuration, eventHandler, taskCommunicatorManagerInterface, clock, taskHeartbeatHandler, true, appContext, vertexLocationHint, (Map) null, TestVertexImpl.taskSpecificLaunchCmdOption, stateChangeNotifier, configuration2);
            this.dispatcher = drainDispatcher;
        }

        protected RootInputInitializerManager createRootInputInitializerManager(String str, String str2, TezVertexID tezVertexID, EventHandler eventHandler, int i, int i2, Resource resource, Resource resource2) {
            try {
                this.rootInputInitializerManager = new RootInputInitializerManagerControlled(this, getAppContext(), eventHandler, this.dispatcher, this.stateChangeNotifier);
                return this.rootInputInitializerManager;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        RootInputInitializerManagerControlled getRootInputInitializerManager() {
            return this.rootInputInitializerManager;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$VertexImplWithRunningInputInitializer.class */
    public static class VertexImplWithRunningInputInitializer extends VertexImpl {
        private RootInputInitializerManagerWithRunningInitializer rootInputInitializerManager;
        private final InputInitializer presetInitializer;

        public VertexImplWithRunningInputInitializer(TezVertexID tezVertexID, DAGProtos.VertexPlan vertexPlan, String str, Configuration configuration, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher drainDispatcher, InputInitializer inputInitializer, StateChangeNotifier stateChangeNotifier, Configuration configuration2) {
            super(tezVertexID, vertexPlan, str, configuration, eventHandler, taskCommunicatorManagerInterface, clock, taskHeartbeatHandler, true, appContext, vertexLocationHint, (Map) null, TestVertexImpl.taskSpecificLaunchCmdOption, stateChangeNotifier, configuration2);
            this.presetInitializer = inputInitializer;
        }

        protected RootInputInitializerManager createRootInputInitializerManager(String str, String str2, TezVertexID tezVertexID, EventHandler eventHandler, int i, int i2, Resource resource, Resource resource2) {
            try {
                this.rootInputInitializerManager = new RootInputInitializerManagerWithRunningInitializer(this, getAppContext(), this.presetInitializer, this.stateChangeNotifier);
                return this.rootInputInitializerManager;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @InterfaceAudience.Private
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$VertexManagerWithException.class */
    public static class VertexManagerWithException extends ImmediateStartVertexManager {
        private VMExceptionLocation exLocation;

        /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestVertexImpl$VertexManagerWithException$VMExceptionLocation.class */
        public enum VMExceptionLocation {
            NoExceptionDoReconfigure,
            OnRootVertexInitialized,
            OnSourceTaskCompleted,
            OnVertexStarted,
            OnVertexManagerEventReceived,
            OnVertexManagerVertexStateUpdated,
            Initialize
        }

        public VertexManagerWithException(VertexManagerPluginContext vertexManagerPluginContext) {
            super(vertexManagerPluginContext);
        }

        public void initialize() {
            super.initialize();
            this.exLocation = VMExceptionLocation.valueOf(new String(getContext().getUserPayload().deepCopyAsArray()));
            if (this.exLocation == VMExceptionLocation.Initialize) {
                throw new RuntimeException(this.exLocation.name());
            }
            if (this.exLocation == VMExceptionLocation.NoExceptionDoReconfigure) {
                getContext().vertexReconfigurationPlanned();
            }
        }

        public void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
            if (this.exLocation == VMExceptionLocation.OnRootVertexInitialized) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onRootVertexInitialized(str, inputDescriptor, list);
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier taskAttemptIdentifier) {
            if (this.exLocation == VMExceptionLocation.OnSourceTaskCompleted) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onSourceTaskCompleted(taskAttemptIdentifier);
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> list) {
            if (this.exLocation == VMExceptionLocation.OnVertexStarted) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onVertexStarted(list);
            if (this.exLocation == VMExceptionLocation.NoExceptionDoReconfigure) {
                getContext().doneReconfiguringVertex();
            }
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
            super.onVertexManagerEventReceived(vertexManagerEvent);
            if (this.exLocation == VMExceptionLocation.OnVertexManagerEventReceived) {
                throw new RuntimeException(this.exLocation.name());
            }
        }

        public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
            super.onVertexStateUpdated(vertexStateUpdate);
            if (this.exLocation == VMExceptionLocation.OnVertexManagerVertexStateUpdated) {
                throw new RuntimeException(this.exLocation.name());
            }
        }
    }

    private DAGProtos.DAGPlan createInvalidDAGPlan() {
        LOG.info("Setting up invalid dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("testverteximplinvalid").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(0).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithCountingVM() {
        LOG.info("Setting up dag plan with coutning VertexManager");
        return DAGProtos.DAGPlan.newBuilder().setName("dagWithCountingVM").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(InvocationCountingVertexManager.class.getName())).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithVMException(String str, VertexManagerWithException.VMExceptionLocation vMExceptionLocation) {
        LOG.info("Setting up dag plan with VertexManager which would throw exception");
        return DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerWithException.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(vMExceptionLocation.name().getBytes())))).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerWithException.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(vMExceptionLocation.name().getBytes())))).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithIIException() {
        LOG.info("Setting up dag plan with VertexManager which would throw exception");
        return DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithNonExistInputInitializer() {
        LOG.info("Setting up dag plan with non exist inputinitializer");
        return DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-input-initializer")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithNonExistOutputCommitter() {
        LOG.info("Setting up dag plan with non exist output committer");
        return DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-output-committer")).setName("output1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("OutputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithNonExistVertexManager() {
        LOG.info("Setting up dag plan with non-exist VertexManager");
        return DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-vertexmanager")).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithMixedEdges() {
        LOG.info("Setting up mixed edge dag plan");
        DAG create = DAG.create("MixedEdges");
        org.apache.tez.dag.api.Vertex create2 = org.apache.tez.dag.api.Vertex.create("vertex1", ProcessorDescriptor.create("v1.class"), 1, Resource.newInstance(0, 0));
        org.apache.tez.dag.api.Vertex create3 = org.apache.tez.dag.api.Vertex.create("vertex2", ProcessorDescriptor.create("v2.class"), 1, Resource.newInstance(0, 0));
        org.apache.tez.dag.api.Vertex create4 = org.apache.tez.dag.api.Vertex.create("vertex3", ProcessorDescriptor.create("v3.class"), 1, Resource.newInstance(0, 0));
        org.apache.tez.dag.api.Vertex create5 = org.apache.tez.dag.api.Vertex.create("vertex4", ProcessorDescriptor.create("v4.class"), 1, Resource.newInstance(0, 0));
        org.apache.tez.dag.api.Vertex create6 = org.apache.tez.dag.api.Vertex.create("vertex5", ProcessorDescriptor.create("v5.class"), 1, Resource.newInstance(0, 0));
        org.apache.tez.dag.api.Vertex create7 = org.apache.tez.dag.api.Vertex.create("vertex6", ProcessorDescriptor.create("v6.class"), 1, Resource.newInstance(0, 0));
        create.addVertex(create2).addVertex(create3).addVertex(create4).addVertex(create5).addVertex(create6).addVertex(create7);
        create.addEdge(Edge.create(create2, create3, EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"), InputDescriptor.create("out.class"))));
        create.addEdge(Edge.create(create2, create4, EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"), InputDescriptor.create("out.class"))));
        create.addEdge(Edge.create(create5, create3, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"), InputDescriptor.create("out.class"))));
        create.addEdge(Edge.create(create6, create4, EdgeProperty.create(EdgeProperty.DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"), InputDescriptor.create("out.class"))));
        create.addEdge(Edge.create(create5, create7, EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"), InputDescriptor.create("out.class"))));
        create.addEdge(Edge.create(create6, create7, EdgeProperty.create(EdgeProperty.DataMovementType.ONE_TO_ONE, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out.class"), InputDescriptor.create("out.class"))));
        return create.createDag(this.conf, (Credentials) null, (Map) null, (LocalResource) null, true);
    }

    private DAGProtos.DAGPlan createDAGPlanWithInitializer0Tasks(String str) {
        LOG.info("Setting up dag plan with input initializer and 0 tasks");
        return DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addInEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex1").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithMultipleInitializers(String str) {
        LOG.info("Setting up dag plan with multiple input initializer");
        return DAGProtos.DAGPlan.newBuilder().setName("testVertexWithMultipleInitializers").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithInputInitializer(String str) {
        LOG.info("Setting up dag plan with input initializer");
        return DAGProtos.DAGPlan.newBuilder().setName("testVertexWithInitializer").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input3").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(RootInputSpecUpdaterVertexManager.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(new byte[]{0})))).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input4").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(RootInputSpecUpdaterVertexManager.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(new byte[]{1})))).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithRunningInitializer3() {
        LOG.info("Setting up dag plan with running input initializer3");
        return DAGProtos.DAGPlan.newBuilder().setName("DagWithInputInitializer3").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(20).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v2_v3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithRunningInitializer4() {
        LOG.info("Setting up dag plan with running input initializer4");
        return DAGProtos.DAGPlan.newBuilder().setName("DagWithInputInitializer4").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addInEdgeId("e1").addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(20).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v2_v3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithRunningInitializer() {
        LOG.info("Setting up dag plan with running input initializer");
        return DAGProtos.DAGPlan.newBuilder().setName("DagWithInputInitializer2").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(20).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanWithInputDistributor(String str) {
        LOG.info("Setting up invalid dag plan with input distributor");
        return DAGProtos.DAGPlan.newBuilder().setName("testVertexWithInitializer").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v5")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanForOneToOneSplit(String str, int i, boolean z) {
        DAGProtos.VertexPlan.Builder newBuilder = DAGProtos.VertexPlan.newBuilder();
        newBuilder.setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addOutEdgeId("e1").addOutEdgeId("e2");
        if (z) {
            newBuilder.addOutEdgeId("e5");
        }
        if (str != null) {
            i = -1;
            newBuilder.addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(str)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build());
        } else {
            newBuilder.setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(i).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build());
        }
        DAGProtos.VertexPlan build = newBuilder.build();
        DAGProtos.VertexPlan.Builder newBuilder2 = DAGProtos.VertexPlan.newBuilder();
        newBuilder2.setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(i).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e3").addInEdgeId("e4");
        if (z) {
            newBuilder2.addOutEdgeId("e6");
        }
        DAGProtos.VertexPlan build2 = newBuilder2.build();
        LOG.info("Setting up one to one dag plan");
        DAGProtos.DAGPlan.Builder addEdge = DAGProtos.DAGPlan.newBuilder().setName("testVertexOneToOneSplit").addVertex(build).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(i).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").addOutEdgeId("e3").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(i).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).addInEdgeId("e2").addOutEdgeId("e4").build()).addVertex(build2).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v2_v4")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v3_v4")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e4").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL));
        if (z) {
            addEdge.addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex5").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex6").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v5")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v4_v6")).setInputVertexName("vertex4").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build());
        }
        return addEdge.build();
    }

    private DAGProtos.DAGPlan createTestDAGPlan() {
        LOG.info("Setting up dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addLocalResource(DAGProtos.PlanLocalResource.newBuilder().setName("dag lr").setUri("dag ir uri").setSize(1L).setTimeStamp(1L).setType(DAGProtos.PlanLocalResourceType.FILE).setVisibility(DAGProtos.PlanLocalResourceVisibility.APPLICATION).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").addLocalResource(DAGProtos.PlanLocalResource.newBuilder().setName("vertex lr").setUri("vertex ir uri").setSize(1L).setTimeStamp(1L).setType(DAGProtos.PlanLocalResourceType.FILE).setVisibility(DAGProtos.PlanLocalResourceVisibility.APPLICATION).build()).build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").addOutEdgeId("e3").addOutEdgeId("e4").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack4").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e3").addOutEdgeId("e5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex5").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host5").addRack("rack5").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x5.y5").build()).addInEdgeId("e4").addOutEdgeId("e6").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex6").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host6").addRack("rack6").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output").build()).setName("outputx").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()))).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x6.y6").build()).addInEdgeId("e5").addInEdgeId("e6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i4_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v4")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i5_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v5")).setOutputVertexName("vertex5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(EdgeManagerForTest.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(this.edgePayload))).build()).setId("e4").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v4")).setInputVertexName("vertex4").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("org.apache.tez.o4")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v5")).setInputVertexName("vertex5").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("org.apache.tez.o5")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createSamplerDAGPlan(boolean z) {
        LOG.info("Setting up dag plan");
        DAGProtos.VertexPlan.Builder newBuilder = DAGProtos.VertexPlan.newBuilder();
        newBuilder.setName("C").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("C.class")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(z ? -1 : 2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).addInEdgeId("A_C").addInEdgeId("B_C");
        if (z) {
            newBuilder.setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName()));
        }
        return DAGProtos.DAGPlan.newBuilder().setName("TestSamplerDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("A").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).addOutEdgeId("A_B").addOutEdgeId("A_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("B").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addInEdgeId("A_B").addOutEdgeId("B_C").build()).addVertex(newBuilder.build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_B.class")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_B.class")).setOutputVertexName("B").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("A_B").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C.class")).setOutputVertexName("C").setDataMovementType(z ? DAGProtos.PlanEdgeDataMovementType.CUSTOM : DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("A_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setInputVertexName("B").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setOutputVertexName("C").setDataMovementType(z ? DAGProtos.PlanEdgeDataMovementType.CUSTOM : DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("B_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createVertexGroupDAGPlan() {
        LOG.info("Setting up group dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("TestGroupDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("A").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).addOutEdgeId("A_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("B").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addOutEdgeId("B_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("C").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("C.class")).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("A_C").addInEdgeId("B_C").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("A_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setInputVertexName("B").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("B_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addVertexGroups(DAGProtos.PlanVertexGroupInfo.newBuilder().setGroupName("Group").addGroupMembers("A").addGroupMembers("B").addEdgeMergedInputs(DAGProtos.PlanGroupInputEdgeInfo.newBuilder().setDestVertexName("C").setMergedInput(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("Group.class").build()).build())).build();
    }

    private DAGProtos.DAGPlan createDAGWithCustomVertexManager() {
        LOG.info("Setting up custom vertex manager dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("TestCustomVMDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("v1").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("v2").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).addOutEdgeId("2_3").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("v3").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).addInEdgeId("2_3").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("2_3")).setInputVertexName("v2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("2_3.class")).setOutputVertexName("v3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("2_3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createSamplerDAGPlan2() {
        LOG.info("Setting up sampler 2 dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("TestSamplerDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("A").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).addOutEdgeId("A_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("B").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addOutEdgeId("B_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("C").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("C.class")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("A_C").addInEdgeId("B_C").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("A_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setInputVertexName("B").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("B_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private DAGProtos.DAGPlan createDAGPlanForGraceParallelism() throws IOException {
        LOG.info("Setting up grace parallelism dag plan");
        return DAGProtos.DAGPlan.newBuilder().setName("GraceParallelismDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("A").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addOutEdgeId("A_B").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("B").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addInEdgeId("A_B").addOutEdgeId("B_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("C").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("C.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(GraceShuffleVertexManagerForTest.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(GraceShuffleVertexManagerForTest.newConfBuilder().setGrandparentVertex("A").setDesiredParallelism(1).toByteString()).build())).addInEdgeId("B_C").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_B")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_B.class")).setOutputVertexName("B").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("A_B").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C")).setInputVertexName("B").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("B_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    private void setupVertices() {
        int vertexCount = this.dagPlan.getVertexCount();
        LOG.info("Setting up vertices from dag plan, verticesCnt=" + vertexCount);
        this.vertices = new HashMap();
        this.vertexIdMap = new HashMap();
        Configuration configuration = new Configuration(false);
        configuration.set("abc", "foobar");
        for (int i = 0; i < vertexCount; i++) {
            DAGProtos.VertexPlan vertex = this.dagPlan.getVertex(i);
            String name = vertex.getName();
            TezVertexID tezVertexID = TezVertexID.getInstance(this.dagId, i + 1);
            VertexLocationHint convertFromDAGPlan = DagTypeConverters.convertFromDAGPlan(vertex.getTaskLocationHintList());
            VertexImpl vertexImplWithControlledInitializerManager = this.useCustomInitializer ? this.customInitializer == null ? new VertexImplWithControlledInitializerManager(tezVertexID, vertex, vertex.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, this.appContext, convertFromDAGPlan, this.dispatcher, this.updateTracker, configuration) : new VertexImplWithRunningInputInitializer(tezVertexID, vertex, vertex.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, this.appContext, convertFromDAGPlan, this.dispatcher, this.customInitializer, this.updateTracker, configuration) : new VertexImpl(tezVertexID, vertex, vertex.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, true, this.appContext, convertFromDAGPlan, this.vertexGroups, taskSpecificLaunchCmdOption, this.updateTracker, configuration);
            this.vertices.put(name, vertexImplWithControlledInitializerManager);
            this.vertexIdMap.put(tezVertexID, vertexImplWithControlledInitializerManager);
        }
    }

    private void parseVertexEdges() {
        LOG.info("Parsing edges from dag plan, edgeCount=" + this.dagPlan.getEdgeCount());
        int vertexCount = this.dagPlan.getVertexCount();
        Map createEdgePlanMapFromDAGPlan = DagTypeConverters.createEdgePlanMapFromDAGPlan(this.dagPlan.getEdgeList());
        for (int i = 0; i < vertexCount; i++) {
            DAGProtos.VertexPlan vertex = this.dagPlan.getVertex(i);
            Vertex vertex2 = this.vertices.get(vertex.getName());
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            for (String str : vertex.getInEdgeIdList()) {
                Vertex vertex3 = this.vertices.get(((DAGProtos.EdgePlan) createEdgePlanMapFromDAGPlan.get(str)).getInputVertexName());
                Edge edge = this.edges.get(str);
                edge.setSourceVertex(vertex3);
                edge.setDestinationVertex(vertex2);
                hashMap.put(vertex3, edge);
            }
            for (String str2 : vertex.getOutEdgeIdList()) {
                Vertex vertex4 = this.vertices.get(((DAGProtos.EdgePlan) createEdgePlanMapFromDAGPlan.get(str2)).getOutputVertexName());
                Edge edge2 = this.edges.get(str2);
                edge2.setSourceVertex(vertex2);
                edge2.setDestinationVertex(vertex4);
                hashMap2.put(vertex4, edge2);
            }
            LOG.info("Setting input vertices for vertex " + vertex2.getName() + ", inputVerticesCnt=" + hashMap.size());
            vertex2.setInputVertices(hashMap);
            LOG.info("Setting output vertices for vertex " + vertex2.getName() + ", outputVerticesCnt=" + hashMap2.size());
            vertex2.setOutputVertices(hashMap2);
        }
    }

    public void setupPreDagCreation() {
        LOG.info("____________ RESETTING CURRENT DAG ____________");
        this.conf = new Configuration();
        this.conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(100L, 1), 1);
        this.dagId = TezDAGID.getInstance(this.appAttemptId.getApplicationId(), 1);
        taskSpecificLaunchCmdOption = (TaskSpecificLaunchCmdOption) Mockito.mock(TaskSpecificLaunchCmdOption.class);
        ((TaskSpecificLaunchCmdOption) Mockito.doReturn(false).when(taskSpecificLaunchCmdOption)).addTaskSpecificLaunchCmdOption((String) Matchers.any(String.class), Matchers.anyInt());
    }

    public void setupPostDagCreation() throws TezException {
        if (this.dispatcher != null) {
            this.dispatcher.stop();
        }
        this.dispatcher = new DrainDispatcher();
        this.appContext = (AppContext) Mockito.mock(AppContext.class);
        Mockito.when(this.appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
        Mockito.when(this.appContext.getContainerLauncherName(Matchers.anyInt())).thenReturn(TezConstants.getTezYarnServicePluginName());
        this.thh = (TaskHeartbeatHandler) Mockito.mock(TaskHeartbeatHandler.class);
        this.historyEventHandler = (HistoryEventHandler) Mockito.mock(HistoryEventHandler.class);
        TaskSchedulerManager taskSchedulerManager = (TaskSchedulerManager) Mockito.mock(TaskSchedulerManager.class);
        try {
            UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
            org.apache.tez.dag.app.dag.DAG dag = (org.apache.tez.dag.app.dag.DAG) Mockito.mock(org.apache.tez.dag.app.dag.DAG.class);
            ((org.apache.tez.dag.app.dag.DAG) Mockito.doReturn(currentUser).when(dag)).getDagUGI();
            ((org.apache.tez.dag.app.dag.DAG) Mockito.doReturn("dag0").when(dag)).getName();
            HashMap hashMap = new HashMap();
            for (DAGProtos.PlanLocalResource planLocalResource : this.dagPlan.getLocalResourceList()) {
                hashMap.put(planLocalResource.getName(), DagTypeConverters.convertPlanLocalResourceToLocalResource(planLocalResource));
            }
            Mockito.when(dag.getLocalResources()).thenReturn(hashMap);
            ((AppContext) Mockito.doReturn(this.appAttemptId).when(this.appContext)).getApplicationAttemptId();
            ((AppContext) Mockito.doReturn(this.appAttemptId.getApplicationId()).when(this.appContext)).getApplicationID();
            ((AppContext) Mockito.doReturn(dag).when(this.appContext)).getCurrentDAG();
            this.execService = (ListeningExecutorService) Mockito.mock(ListeningExecutorService.class);
            final ListenableFuture listenableFuture = (ListenableFuture) Mockito.mock(ListenableFuture.class);
            ((ListeningExecutorService) Mockito.doAnswer(new Answer() { // from class: org.apache.tez.dag.app.dag.impl.TestVertexImpl.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public ListenableFuture<Void> m23answer(InvocationOnMock invocationOnMock) {
                    TestVertexImpl.this.dispatcher.getEventHandler().handle((CallableEvent) invocationOnMock.getArguments()[0]);
                    return listenableFuture;
                }
            }).when(this.execService)).submit((Callable) Matchers.any());
            MockClock mockClock = new MockClock();
            ((AppContext) Mockito.doReturn(this.execService).when(this.appContext)).getExecService();
            ((AppContext) Mockito.doReturn(this.conf).when(this.appContext)).getAMConf();
            ((org.apache.tez.dag.app.dag.DAG) Mockito.doReturn(new Credentials()).when(dag)).getCredentials();
            ((org.apache.tez.dag.app.dag.DAG) Mockito.doReturn(DAGProtos.DAGPlan.getDefaultInstance()).when(dag)).getJobPlan();
            ((AppContext) Mockito.doReturn(this.dagId).when(this.appContext)).getCurrentDAGID();
            ((org.apache.tez.dag.app.dag.DAG) Mockito.doReturn(this.dagId).when(dag)).getID();
            ((AppContext) Mockito.doReturn(taskSchedulerManager).when(this.appContext)).getTaskScheduler();
            ((TaskSchedulerManager) Mockito.doReturn(Resource.newInstance(102400, 60)).when(taskSchedulerManager)).getTotalResources(0);
            ((AppContext) Mockito.doReturn(this.historyEventHandler).when(this.appContext)).getHistoryHandler();
            ((AppContext) Mockito.doReturn(this.dispatcher.getEventHandler()).when(this.appContext)).getEventHandler();
            ((AppContext) Mockito.doReturn(mockClock).when(this.appContext)).getClock();
            this.vertexGroups = Maps.newHashMap();
            for (DAGProtos.PlanVertexGroupInfo planVertexGroupInfo : this.dagPlan.getVertexGroupsList()) {
                this.vertexGroups.put(planVertexGroupInfo.getGroupName(), new DAGImpl.VertexGroupInfo(planVertexGroupInfo));
            }
            if (this.updateTracker != null) {
                this.updateTracker.stop();
            }
            this.updateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.appContext.getCurrentDAG());
            setupVertices();
            Mockito.when(dag.getVertex((TezVertexID) Matchers.any(TezVertexID.class))).thenAnswer(new Answer<Vertex>() { // from class: org.apache.tez.dag.app.dag.impl.TestVertexImpl.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Vertex m24answer(InvocationOnMock invocationOnMock) throws Throwable {
                    Object[] arguments = invocationOnMock.getArguments();
                    if (arguments.length != 1) {
                        return null;
                    }
                    return (Vertex) TestVertexImpl.this.vertexIdMap.get((TezVertexID) arguments[0]);
                }
            });
            Mockito.when(dag.getVertex((String) Matchers.any(String.class))).thenAnswer(new Answer<Vertex>() { // from class: org.apache.tez.dag.app.dag.impl.TestVertexImpl.3
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Vertex m25answer(InvocationOnMock invocationOnMock) throws Throwable {
                    Object[] arguments = invocationOnMock.getArguments();
                    if (arguments.length != 1) {
                        return null;
                    }
                    return (Vertex) TestVertexImpl.this.vertices.get((String) arguments[0]);
                }
            });
            this.edges = new HashMap();
            for (DAGProtos.EdgePlan edgePlan : this.dagPlan.getEdgeList()) {
                this.edges.put(edgePlan.getId(), new Edge(DagTypeConverters.createEdgePropertyMapFromDAGPlan(edgePlan), this.dispatcher.getEventHandler(), this.conf));
            }
            parseVertexEdges();
            Iterator<Edge> it = this.edges.values().iterator();
            while (it.hasNext()) {
                it.next().initialize();
            }
            this.dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
            this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
            this.dispatcher.register(TaskAttemptEventType.class, this.taskAttemptEventDispatcher);
            this.taskEventDispatcher = new TaskEventDispatcher();
            this.dispatcher.register(TaskEventType.class, this.taskEventDispatcher);
            this.vertexEventDispatcher = new VertexEventDispatcher();
            this.dispatcher.register(VertexEventType.class, this.vertexEventDispatcher);
            this.dagEventDispatcher = new DagEventDispatcher();
            this.dispatcher.register(DAGEventType.class, this.dagEventDispatcher);
            this.amSchedulerEventDispatcher = new AMSchedulerEventDispatcher();
            this.dispatcher.register(AMSchedulerEventType.class, this.amSchedulerEventDispatcher);
            this.dispatcher.init(this.conf);
            this.dispatcher.start();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @BeforeClass
    public static void beforeClass() {
        MockDNSToSwitchMapping.initializeMockRackResolver();
    }

    @Before
    public void setup() throws TezException {
        this.useCustomInitializer = false;
        this.customInitializer = null;
        setupPreDagCreation();
        this.dagPlan = createTestDAGPlan();
        this.invalidDagPlan = createInvalidDAGPlan();
        setupPostDagCreation();
    }

    @After
    public void teardown() {
        if (this.dispatcher.isInState(Service.STATE.STARTED)) {
            this.dispatcher.await();
            this.dispatcher.stop();
        }
        this.updateTracker.stop();
        this.execService.shutdownNow();
        this.dispatcher = null;
        this.vertexEventDispatcher = null;
        this.dagEventDispatcher = null;
        this.dagPlan = null;
        this.invalidDagPlan = null;
        this.vertices = null;
        this.edges = null;
        this.vertexIdMap = null;
    }

    private void initAllVertices(VertexState vertexState) {
        for (int i = 1; i <= this.vertices.size(); i++) {
            VertexImpl vertexImpl = this.vertices.get("vertex" + i);
            if (vertexImpl.sourceVertices == null || vertexImpl.sourceVertices.isEmpty()) {
                initVertex(vertexImpl);
            }
        }
        for (int i2 = 1; i2 <= this.vertices.size(); i2++) {
            Assert.assertEquals(vertexState, this.vertices.get("vertex" + i2).getState());
        }
    }

    private void initVertex(VertexImpl vertexImpl) {
        Assert.assertEquals(VertexState.NEW, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
    }

    private void startVertex(VertexImpl vertexImpl) {
        startVertex(vertexImpl, true);
    }

    private void killVertex(VertexImpl vertexImpl) {
        this.dispatcher.getEventHandler().handle(new VertexEventTermination(vertexImpl.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
        Assert.assertEquals(vertexImpl.getTerminationCause(), VertexTerminationCause.DAG_TERMINATED);
    }

    private void startVertex(VertexImpl vertexImpl, boolean z) {
        Assert.assertEquals(VertexState.INITED, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        if (z) {
            Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        }
    }

    private void completeAllTasksSuccessfully(Vertex vertex) {
        Assert.assertEquals(VertexState.RUNNING, vertex.getState());
        Set keySet = vertex.getTasks().keySet();
        Assert.assertFalse(keySet.isEmpty());
        Iterator it = keySet.iterator();
        while (it.hasNext()) {
            this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted((TezTaskID) it.next(), TaskState.SUCCEEDED));
        }
        this.dispatcher.await();
    }

    @Test(timeout = 5000)
    public void testVertexInit() throws AMUserCodeException {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        Assert.assertEquals("x3.y3", vertexImpl.getProcessorName());
        Assert.assertTrue(vertexImpl.getJavaOpts().contains("foo"));
        Assert.assertEquals(2L, vertexImpl.getInputSpecList(0).size());
        Assert.assertEquals(2L, vertexImpl.getInputVerticesCount());
        Assert.assertEquals(2L, vertexImpl.getOutputVerticesCount());
        Assert.assertEquals(2L, vertexImpl.getOutputVerticesCount());
        Assert.assertTrue("vertex1".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(0)).getSourceVertexName()) || "vertex2".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(0)).getSourceVertexName()));
        Assert.assertTrue("vertex1".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(1)).getSourceVertexName()) || "vertex2".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(1)).getSourceVertexName()));
        Assert.assertTrue("i3_v1".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(0)).getInputDescriptor().getClassName()) || "i3_v2".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(0)).getInputDescriptor().getClassName()));
        Assert.assertTrue("i3_v1".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(1)).getInputDescriptor().getClassName()) || "i3_v2".equals(((InputSpec) vertexImpl.getInputSpecList(0).get(1)).getInputDescriptor().getClassName()));
        Assert.assertTrue("vertex4".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(0)).getDestinationVertexName()) || "vertex5".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(0)).getDestinationVertexName()));
        Assert.assertTrue("vertex4".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(1)).getDestinationVertexName()) || "vertex5".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(1)).getDestinationVertexName()));
        Assert.assertTrue("o3_v4".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(0)).getOutputDescriptor().getClassName()) || "o3_v5".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(0)).getOutputDescriptor().getClassName()));
        Assert.assertTrue("o3_v4".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(1)).getOutputDescriptor().getClassName()) || "o3_v5".equals(((OutputSpec) vertexImpl.getOutputSpecList(0).get(1)).getOutputDescriptor().getClassName()));
    }

    @Test(timeout = 5000)
    public void testNonExistVertexManager() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithNonExistVertexManager();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        vertexImpl.handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertTrue(StringUtils.join(vertexImpl.getDiagnostics(), "").contains("java.lang.ClassNotFoundException: non-exist-vertexmanager"));
    }

    @Test(timeout = 5000)
    public void testNonExistInputInitializer() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithNonExistInputInitializer();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        vertexImpl.handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertTrue(StringUtils.join(vertexImpl.getDiagnostics(), "").contains("java.lang.ClassNotFoundException: non-exist-input-initializer"));
    }

    @Test(timeout = 5000)
    public void testNonExistOutputCommitter() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithNonExistOutputCommitter();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        vertexImpl.handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.INIT_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertTrue(StringUtils.join(vertexImpl.getDiagnostics(), "").contains("java.lang.ClassNotFoundException: non-exist-output-committer"));
    }

    @Test(timeout = 5000)
    public void testVertexConfigureEvent() throws Exception {
        initAllVertices(VertexState.INITED);
        TestUpdateListener testUpdateListener = new TestUpdateListener();
        this.updateTracker.registerForVertexUpdates("vertex3", EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED), testUpdateListener);
        Assert.assertEquals(1L, testUpdateListener.events.size());
        Assert.assertEquals("vertex3", testUpdateListener.events.get(0).getVertexName());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED, testUpdateListener.events.get(0).getVertexState());
        this.updateTracker.unregisterForVertexUpdates("vertex3", testUpdateListener);
    }

    @Test(timeout = 5000)
    public void testVertexConfigureEventWithReconfigure() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithVMException("TestVMStateUpdate", VertexManagerWithException.VMExceptionLocation.NoExceptionDoReconfigure);
        setupPostDagCreation();
        TestUpdateListener testUpdateListener = new TestUpdateListener();
        this.updateTracker.registerForVertexUpdates("vertex2", EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED), testUpdateListener);
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        vertexImplWithControlledInitializerManager.getRootInputInitializerManager().completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImpl.getState());
        Assert.assertEquals(0L, testUpdateListener.events.size());
        startVertex(vertexImplWithControlledInitializerManager, true);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(1L, testUpdateListener.events.size());
        Assert.assertEquals("vertex2", testUpdateListener.events.get(0).getVertexName());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED, testUpdateListener.events.get(0).getVertexState());
        this.updateTracker.unregisterForVertexUpdates("vertex2", testUpdateListener);
    }

    @Test(timeout = 5000)
    public void testVertexConfigureEventBadReconfigure() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        VertexImpl vertexImpl2 = this.vertices.get("vertex2");
        VertexImpl vertexImpl3 = this.vertices.get("vertex1");
        startVertex(vertexImpl2);
        startVertex(vertexImpl3);
        try {
            vertexImpl.doneReconfiguringVertex();
            Assert.fail();
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().contains("invoked only after vertexReconfigurationPlanned"));
        }
    }

    @Test(timeout = 5000)
    public void testVertexConfigureEventBadSetParallelism() throws Exception {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        VertexImpl vertexImpl2 = this.vertices.get("vertex2");
        VertexImpl vertexImpl3 = this.vertices.get("vertex1");
        startVertex(vertexImpl2);
        startVertex(vertexImpl3);
        try {
            vertexImpl.reconfigureVertex(1, (VertexLocationHint) null, (Map) null);
            Assert.fail();
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().contains("context.vertexReconfigurationPlanned() before re-configuring"));
        }
    }

    @Test(timeout = 5000)
    public void testVertexStart() {
        initAllVertices(VertexState.INITED);
        startVertex(this.vertices.get("vertex2"));
    }

    @Test(timeout = 5000)
    public void testVertexGetTAAttempts() throws Exception {
        initAllVertices(VertexState.INITED);
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        VertexImpl vertexImpl2 = this.vertices.get("vertex4");
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(1L, vertexImpl2.sourceVertices.size());
        Edge edge = (Edge) vertexImpl2.sourceVertices.get(vertexImpl);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl.getVertexId(), 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl2.getVertexId(), 0), 0);
        for (int i = 0; i < 5; i++) {
            vertexImpl2.handle(new VertexEventRouteEvent(vertexImpl2.getVertexId(), Collections.singletonList(new TezEvent(DataMovementEvent.create(0, (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl.getName(), vertexImpl.getName(), tezTaskAttemptID)))));
        }
        this.dispatcher.await();
        Assert.assertEquals(5L, vertexImpl2.pendingTaskEvents.size());
        LinkedList linkedList = new LinkedList();
        for (int i2 = 0; i2 < vertexImpl2.getTotalTasks(); i2++) {
            linkedList.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i2, (TaskLocationHint) null));
        }
        vertexImpl2.scheduleTasks(linkedList);
        this.dispatcher.await();
        Assert.assertEquals(5L, vertexImpl2.getOnDemandRouteEvents().size());
        for (int i3 = 5; i3 < 11; i3++) {
            vertexImpl2.handle(new VertexEventRouteEvent(vertexImpl2.getVertexId(), Collections.singletonList(new TezEvent(DataMovementEvent.create(0, (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl.getName(), vertexImpl.getName(), tezTaskAttemptID)))));
        }
        this.dispatcher.await();
        Assert.assertEquals(11L, vertexImpl2.getOnDemandRouteEvents().size());
        EdgeManagerPluginOnDemand edgeManagerPluginOnDemand = (EdgeManagerPluginOnDemand) Mockito.mock(EdgeManagerPluginOnDemand.class);
        EdgeManagerPluginOnDemand.EventRouteMetadata create = EdgeManagerPluginOnDemand.EventRouteMetadata.create(1, new int[]{0});
        edge.edgeManager = edgeManagerPluginOnDemand;
        Mockito.when(edgeManagerPluginOnDemand.routeDataMovementEventToDestination(1, 0, 0)).thenReturn(create);
        TaskAttemptEventInfo taskAttemptTezEvents = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, 0, 0, 1);
        Assert.assertEquals(11L, taskAttemptTezEvents.getNextFromEventId());
        Assert.assertEquals(0L, taskAttemptTezEvents.getEvents().size());
        int i4 = 0;
        Mockito.when(edgeManagerPluginOnDemand.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn(create);
        for (int i5 = 0; i5 < 11; i5++) {
            i4 = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, i4, 0, 1).getNextFromEventId();
            Assert.assertEquals(i5 + 1, i4);
            Assert.assertEquals(1L, r0.getEvents().size());
        }
        TaskAttemptEventInfo taskAttemptTezEvents2 = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, i4, 0, 1);
        Assert.assertEquals(11L, taskAttemptTezEvents2.getNextFromEventId());
        Assert.assertEquals(0L, taskAttemptTezEvents2.getEvents().size());
        TaskAttemptEventInfo taskAttemptTezEvents3 = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, 0, 0, 100);
        Assert.assertEquals(11L, taskAttemptTezEvents3.getNextFromEventId());
        Assert.assertEquals(11L, taskAttemptTezEvents3.getEvents().size());
        int i6 = 0;
        for (int i7 = 1; i7 <= 2; i7++) {
            i6 = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, i6, 0, 5).getNextFromEventId();
            Assert.assertEquals(i7 * 5, i6);
            Assert.assertEquals(5L, r0.getEvents().size());
        }
        TaskAttemptEventInfo taskAttemptTezEvents4 = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, i6, 0, 5);
        Assert.assertEquals(11L, taskAttemptTezEvents4.getNextFromEventId());
        Assert.assertEquals(1L, taskAttemptTezEvents4.getEvents().size());
        Mockito.when(edgeManagerPluginOnDemand.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn(EdgeManagerPluginOnDemand.EventRouteMetadata.create(2, new int[]{0, 0}));
        int i8 = 0;
        int i9 = 0;
        for (int i10 = 1; i10 <= 4; i10++) {
            i8 = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, i8, 0, 5).getNextFromEventId();
            i9 = i10 % 2 > 0 ? i9 + 2 : i9 + 3;
            Assert.assertEquals(i9, i8);
            Assert.assertEquals(5L, r0.getEvents().size());
        }
        TaskAttemptEventInfo taskAttemptTezEvents5 = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, i8, 0, 5);
        Assert.assertEquals(11L, taskAttemptTezEvents5.getNextFromEventId());
        Assert.assertEquals(2L, taskAttemptTezEvents5.getEvents().size());
    }

    @Test(timeout = 5000)
    public void testVertexGetTAAttemptsObsoletion() throws Exception {
        initAllVertices(VertexState.INITED);
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        VertexImpl vertexImpl2 = this.vertices.get("vertex4");
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < vertexImpl2.getTotalTasks(); i++) {
            linkedList.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, (TaskLocationHint) null));
        }
        vertexImpl2.scheduleTasks(linkedList);
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(1L, vertexImpl2.sourceVertices.size());
        Edge edge = (Edge) vertexImpl2.sourceVertices.get(vertexImpl);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl.getVertexId(), 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl2.getVertexId(), 0), 0);
        for (int i2 = 0; i2 < 11; i2++) {
            vertexImpl2.handle(new VertexEventRouteEvent(vertexImpl2.getVertexId(), Collections.singletonList(new TezEvent(DataMovementEvent.create(0, (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl.getName(), vertexImpl.getName(), tezTaskAttemptID)))));
        }
        this.dispatcher.await();
        Assert.assertEquals(11L, vertexImpl2.getOnDemandRouteEvents().size());
        EdgeManagerPluginOnDemand edgeManagerPluginOnDemand = (EdgeManagerPluginOnDemand) Mockito.mock(EdgeManagerPluginOnDemand.class);
        EdgeManagerPluginOnDemand.EventRouteMetadata create = EdgeManagerPluginOnDemand.EventRouteMetadata.create(1, new int[]{0});
        edge.edgeManager = edgeManagerPluginOnDemand;
        Mockito.when(edgeManagerPluginOnDemand.routeInputSourceTaskFailedEventToDestination(Matchers.anyInt(), Matchers.anyInt())).thenReturn(create);
        Mockito.when(edgeManagerPluginOnDemand.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn(create);
        vertexImpl2.handle(new VertexEventRouteEvent(vertexImpl2.getVertexId(), Collections.singletonList(new TezEvent(InputFailedEvent.create(0, 0), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl.getName(), vertexImpl.getName(), tezTaskAttemptID)))));
        TaskAttemptEventInfo taskAttemptTezEvents = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, 0, 0, 100);
        Assert.assertEquals(12L, taskAttemptTezEvents.getNextFromEventId());
        Assert.assertEquals(1L, taskAttemptTezEvents.getEvents().size());
        Assert.assertEquals(EventType.INPUT_FAILED_EVENT, ((TezEvent) taskAttemptTezEvents.getEvents().get(0)).getEventType());
        for (int i3 = 11; i3 < 14; i3++) {
            vertexImpl2.handle(new VertexEventRouteEvent(vertexImpl2.getVertexId(), Collections.singletonList(new TezEvent(DataMovementEvent.create(0, (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl.getName(), vertexImpl.getName(), tezTaskAttemptID)))));
        }
        this.dispatcher.await();
        Assert.assertEquals(12L, vertexImpl2.getOnDemandRouteEvents().size());
        Assert.assertEquals(EventType.INPUT_FAILED_EVENT, ((TezEvent) vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, 0, 0, 100).getEvents().get(0)).getEventType());
    }

    @Test(timeout = 5000)
    public void testVertexGetTAAttemptsObsoletionWithPendingRoutes() throws Exception {
        initAllVertices(VertexState.INITED);
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        VertexImpl vertexImpl2 = this.vertices.get("vertex4");
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < vertexImpl2.getTotalTasks(); i++) {
            linkedList.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, (TaskLocationHint) null));
        }
        vertexImpl2.scheduleTasks(linkedList);
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(1L, vertexImpl2.sourceVertices.size());
        Edge edge = (Edge) vertexImpl2.sourceVertices.get(vertexImpl);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl.getVertexId(), 0), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl2.getVertexId(), 0), 0);
        for (int i2 = 0; i2 < 11; i2++) {
            vertexImpl2.handle(new VertexEventRouteEvent(vertexImpl2.getVertexId(), Collections.singletonList(new TezEvent(DataMovementEvent.create(0, (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl.getName(), vertexImpl.getName(), tezTaskAttemptID)))));
        }
        this.dispatcher.await();
        Assert.assertEquals(11L, vertexImpl2.getOnDemandRouteEvents().size());
        EdgeManagerPluginOnDemand edgeManagerPluginOnDemand = (EdgeManagerPluginOnDemand) Mockito.mock(EdgeManagerPluginOnDemand.class);
        EdgeManagerPluginOnDemand.EventRouteMetadata create = EdgeManagerPluginOnDemand.EventRouteMetadata.create(1, new int[]{0});
        edge.edgeManager = edgeManagerPluginOnDemand;
        Mockito.when(edgeManagerPluginOnDemand.routeInputSourceTaskFailedEventToDestination(Matchers.anyInt(), Matchers.anyInt())).thenReturn(create);
        Mockito.when(edgeManagerPluginOnDemand.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn(EdgeManagerPluginOnDemand.EventRouteMetadata.create(2, new int[]{0, 0}));
        int nextFromEventId = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, 0, 0, 5).getNextFromEventId();
        Assert.assertEquals(2L, nextFromEventId);
        Assert.assertEquals(5L, r0.getEvents().size());
        vertexImpl2.handle(new VertexEventRouteEvent(vertexImpl2.getVertexId(), Collections.singletonList(new TezEvent(InputFailedEvent.create(0, 0), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl.getName(), vertexImpl.getName(), tezTaskAttemptID)))));
        TaskAttemptEventInfo taskAttemptTezEvents = vertexImpl2.getTaskAttemptTezEvents(tezTaskAttemptID2, nextFromEventId, 0, 5);
        Assert.assertEquals(12L, taskAttemptTezEvents.getNextFromEventId());
        Assert.assertEquals(1L, taskAttemptTezEvents.getEvents().size());
        Assert.assertEquals(EventType.INPUT_FAILED_EVENT, ((TezEvent) taskAttemptTezEvents.getEvents().get(0)).getEventType());
    }

    @Test(timeout = 5000)
    public void testVertexReconfigurePlannedAfterInit() throws Exception {
        initAllVertices(VertexState.INITED);
        try {
            this.vertices.get("vertex3").vertexReconfigurationPlanned();
            Assert.fail();
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().contains("context.vertexReconfigurationPlanned() cannot be called after initialize()"));
        }
    }

    private void checkTasks(Vertex vertex, int i) {
        Assert.assertEquals(i, vertex.getTotalTasks());
        Map tasks = vertex.getTasks();
        Assert.assertEquals(i, tasks.size());
        int i2 = 0;
        Iterator it = tasks.values().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(i2, ((Task) it.next()).getTaskId().getId());
            i2++;
        }
    }

    @Test(timeout = 5000)
    public void testVertexSetParallelismDecrease() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(2L, vertexImpl.getTotalTasks());
        Assert.assertEquals(2L, vertexImpl.getTasks().size());
        VertexImpl vertexImpl2 = this.vertices.get("vertex1");
        startVertex(this.vertices.get("vertex2"));
        startVertex(vertexImpl2);
        vertexImpl.reconfigureVertex(1, (VertexLocationHint) null, Collections.singletonMap(vertexImpl2.getName(), EdgeProperty.create(EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()), EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        vertexImpl.doneReconfiguringVertex();
        Assert.assertTrue(((Edge) vertexImpl.sourceVertices.get(vertexImpl2)).getEdgeManager() instanceof EdgeManagerForTest);
        checkTasks(vertexImpl, 1);
    }

    @Test(timeout = 5000)
    public void testVertexSetParallelismIncrease() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(2L, vertexImpl.getTotalTasks());
        Assert.assertEquals(2L, vertexImpl.getTasks().size());
        VertexImpl vertexImpl2 = this.vertices.get("vertex1");
        startVertex(this.vertices.get("vertex2"));
        startVertex(vertexImpl2);
        vertexImpl.reconfigureVertex(10, (VertexLocationHint) null, Collections.singletonMap(vertexImpl2.getName(), EdgeProperty.create(EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName()), EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
        vertexImpl.doneReconfiguringVertex();
        Assert.assertTrue(((Edge) vertexImpl.sourceVertices.get(vertexImpl2)).getEdgeManager() instanceof EdgeManagerForTest);
        checkTasks(vertexImpl, 10);
    }

    @Test(timeout = 5000)
    public void testVertexSetParallelismMultiple() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(2L, vertexImpl.getTotalTasks());
        Assert.assertEquals(2L, vertexImpl.getTasks().size());
        VertexImpl vertexImpl2 = this.vertices.get("vertex1");
        startVertex(this.vertices.get("vertex2"));
        startVertex(vertexImpl2);
        vertexImpl.reconfigureVertex(10, (VertexLocationHint) null, (Map) null);
        checkTasks(vertexImpl, 10);
        vertexImpl.reconfigureVertex(5, (VertexLocationHint) null, (Map) null);
        checkTasks(vertexImpl, 5);
        vertexImpl.doneReconfiguringVertex();
    }

    @Test(timeout = 5000)
    public void testVertexSetParallelismMultipleFailAfterDone() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(2L, vertexImpl.getTotalTasks());
        Assert.assertEquals(2L, vertexImpl.getTasks().size());
        VertexImpl vertexImpl2 = this.vertices.get("vertex1");
        startVertex(this.vertices.get("vertex2"));
        startVertex(vertexImpl2);
        vertexImpl.reconfigureVertex(10, (VertexLocationHint) null, (Map) null);
        checkTasks(vertexImpl, 10);
        vertexImpl.doneReconfiguringVertex();
        try {
            vertexImpl.reconfigureVertex(5, (VertexLocationHint) null, (Map) null);
            Assert.fail();
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().contains("Vertex is fully configured but still"));
        }
    }

    @Test(timeout = 5000)
    public void testVertexSetParallelismMultipleFailAfterSchedule() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(2L, vertexImpl.getTotalTasks());
        Assert.assertEquals(2L, vertexImpl.getTasks().size());
        VertexImpl vertexImpl2 = this.vertices.get("vertex1");
        startVertex(this.vertices.get("vertex2"));
        startVertex(vertexImpl2);
        vertexImpl.reconfigureVertex(10, (VertexLocationHint) null, (Map) null);
        checkTasks(vertexImpl, 10);
        vertexImpl.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create(0, (TaskLocationHint) null)));
        try {
            vertexImpl.reconfigureVertex(5, (VertexLocationHint) null, (Map) null);
            Assert.fail();
        } catch (TezUncheckedException e) {
            Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling"));
        }
    }

    @Test(timeout = 5000)
    public void testVertexScheduleSendEvent() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(2L, vertexImpl.getTotalTasks());
        Assert.assertEquals(2L, vertexImpl.getTasks().size());
        VertexImpl vertexImpl2 = this.vertices.get("vertex1");
        startVertex(this.vertices.get("vertex2"));
        startVertex(vertexImpl2);
        vertexImpl.reconfigureVertex(10, (VertexLocationHint) null, (Map) null);
        checkTasks(vertexImpl, 10);
        this.taskEventDispatcher.events.clear();
        TaskLocationHint taskLocationHint = (TaskLocationHint) Mockito.mock(TaskLocationHint.class);
        vertexImpl.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create(0, taskLocationHint)));
        this.dispatcher.await();
        Assert.assertEquals(1L, this.taskEventDispatcher.events.size());
        TaskEventScheduleTask taskEventScheduleTask = this.taskEventDispatcher.events.get(0);
        Assert.assertEquals(taskLocationHint, taskEventScheduleTask.getTaskLocationHint());
        Assert.assertNotNull(taskEventScheduleTask.getBaseTaskSpec());
        Assert.assertEquals("foobar", taskEventScheduleTask.getBaseTaskSpec().getTaskConf().get("abc"));
    }

    @Test(timeout = 5000)
    public void testVertexSetParallelismFailAfterSchedule() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(2L, vertexImpl.getTotalTasks());
        Assert.assertEquals(2L, vertexImpl.getTasks().size());
        VertexImpl vertexImpl2 = this.vertices.get("vertex1");
        startVertex(this.vertices.get("vertex2"));
        startVertex(vertexImpl2);
        vertexImpl.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create(0, (TaskLocationHint) null)));
        try {
            vertexImpl.reconfigureVertex(5, (VertexLocationHint) null, (Map) null);
            Assert.fail();
        } catch (TezUncheckedException e) {
            Assert.assertTrue(e.getMessage().contains("setParallelism cannot be called after scheduling"));
        }
    }

    @Test(timeout = 5000)
    public void testVertexPendingTaskEvents() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        VertexImpl vertexImpl2 = this.vertices.get("vertex2");
        startVertex(this.vertices.get("vertex1"));
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl2.getVertexId(), 0), 0);
        LinkedList newLinkedList = Lists.newLinkedList();
        TezEvent tezEvent = new TezEvent(CompositeDataMovementEvent.create(0, 1, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", tezTaskAttemptID));
        TezEvent tezEvent2 = new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", tezTaskAttemptID));
        newLinkedList.add(tezEvent);
        newLinkedList.add(tezEvent2);
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImpl.getVertexId(), newLinkedList));
        this.dispatcher.await();
        Assert.assertEquals(2L, vertexImpl.pendingTaskEvents.size());
        vertexImpl.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create(0, (TaskLocationHint) null)));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImpl.pendingTaskEvents.size());
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImpl.getVertexId(), newLinkedList));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImpl.pendingTaskEvents.size());
    }

    @Test(timeout = 5000)
    public void testSetCustomEdgeManager() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex5");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        Assert.assertTrue(Arrays.equals(this.edgePayload, this.edges.get("e4").getEdgeManager().getEdgeManagerContext().getUserPayload().deepCopyAsArray()));
        UserPayload create = UserPayload.create(ByteBuffer.wrap(new String("foo").getBytes()));
        EdgeManagerPluginDescriptor create2 = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
        create2.setUserPayload(create);
        EdgeProperty create3 = EdgeProperty.create(create2, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"));
        Vertex vertex = this.vertices.get("vertex3");
        vertexImpl.reconfigureVertex(vertexImpl.getTotalTasks() - 1, (VertexLocationHint) null, Collections.singletonMap(vertex.getName(), create3));
        vertexImpl.doneReconfiguringVertex();
        EdgeManagerForTest edgeManager = ((Edge) vertexImpl.sourceVertices.get(vertex)).getEdgeManager();
        Assert.assertNotNull(edgeManager);
        Assert.assertTrue(edgeManager instanceof EdgeManagerForTest);
        Assert.assertTrue(Arrays.equals(create.deepCopyAsArray(), edgeManager.getUserPayload().deepCopyAsArray()));
    }

    @Test(timeout = 5000)
    public void testBasicVertexCompletion() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(1L, vertexImpl.getCompletedTasks());
        Assert.assertTrue(0.5f == vertexImpl.getCompletedTaskProgress());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(2L, vertexImpl.getCompletedTasks());
        Assert.assertTrue(1.0f == vertexImpl.getCompletedTaskProgress());
        Assert.assertTrue(vertexImpl.initTimeRequested > 0);
        Assert.assertTrue(vertexImpl.initedTime > 0);
        Assert.assertTrue(vertexImpl.startTimeRequested > 0);
        Assert.assertTrue(vertexImpl.startedTime > 0);
        Assert.assertTrue(vertexImpl.finishTime > 0);
    }

    @Test(timeout = 5000)
    @Ignore
    public void testDuplicateTaskCompletion() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
    }

    @Test(timeout = 5000)
    public void testVertexFailure() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertTrue(StringUtils.join(vertexImpl.getDiagnostics(), ",").toLowerCase(Locale.ENGLISH).contains("task failed, taskid=" + tezTaskID.toString()));
    }

    @Test(timeout = 5000)
    public void testVertexKillDiagnosticsInInit() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex4");
        killVertex(vertexImpl);
        String lowerCase = StringUtils.join(vertexImpl.getDiagnostics(), ",").toLowerCase(Locale.ENGLISH);
        LOG.info("diagnostics v2: " + lowerCase);
        Assert.assertTrue(lowerCase.contains("vertex received kill in inited state"));
    }

    @Test(timeout = 5000)
    public void testVertexKillDiagnosticsInRunning() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex3");
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        killVertex(vertexImpl);
        String lowerCase = StringUtils.join(vertexImpl.getDiagnostics(), ",").toLowerCase(Locale.ENGLISH);
        Assert.assertTrue(lowerCase.contains("vertex received kill while in running state"));
        Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, vertexImpl.getTerminationCause());
        Assert.assertTrue(lowerCase.contains(vertexImpl.getTerminationCause().name().toLowerCase(Locale.ENGLISH)));
    }

    @Test(timeout = 5000)
    public void testVertexKillPending() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        this.dispatcher.getEventHandler().handle(new VertexEventTermination(vertexImpl.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertexImpl.getVertexId(), 0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertexImpl.getVertexId(), 1), TaskState.KILLED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
    }

    @Test(timeout = 5000)
    public void testVertexKill() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        this.dispatcher.getEventHandler().handle(new VertexEventTermination(vertexImpl.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertexImpl.getVertexId(), 0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(TezTaskID.getInstance(vertexImpl.getVertexId(), 1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.KILLED, vertexImpl.getState());
    }

    @Test(timeout = 5000)
    public void testKilledTasksHandling() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertEquals(TaskState.KILLED, vertexImpl.getTask(tezTaskID2).getState());
    }

    @Test(timeout = 5000)
    public void testVertexCommitterInit() {
        initAllVertices(VertexState.INITED);
        Assert.assertNull(this.vertices.get("vertex2").getOutputCommitter("output"));
        Assert.assertTrue(this.vertices.get("vertex6").getOutputCommitter("outputx") instanceof CountingOutputCommitter);
    }

    @Test(timeout = 5000)
    public void testVertexManagerInit() {
        initAllVertices(VertexState.INITED);
        Assert.assertTrue(this.vertices.get("vertex2").getVertexManager().getPlugin() instanceof ImmediateStartVertexManager);
        Assert.assertTrue(this.vertices.get("vertex6").getVertexManager().getPlugin() instanceof ShuffleVertexManager);
    }

    @Test(timeout = 5000)
    public void testVertexTaskFailure() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex6");
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertexImpl.getOutputCommitter("outputx");
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.FAILED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertEquals(0L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
    }

    @Test(timeout = 5000)
    public void testVertexTaskAttemptProcessorFailure() throws Exception {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        startVertex(vertexImpl);
        this.dispatcher.await();
        TaskAttemptImpl taskAttemptImpl = (TaskAttemptImpl) vertexImpl.getTask(0).getAttempts().values().iterator().next();
        taskAttemptImpl.handle(new TaskAttemptEventSchedule(taskAttemptImpl.getID(), 2, 2));
        NodeId newInstance = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance2 = ContainerId.newInstance(this.appAttemptId, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance2);
        Mockito.when(container.getNodeId()).thenReturn(newInstance);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appContext);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appContext)).getAllContainers();
        taskAttemptImpl.handle(new TaskAttemptEventSubmitted(taskAttemptImpl.getID(), newInstance2));
        taskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptImpl.getID()));
        Assert.assertEquals(TaskAttemptStateInternal.RUNNING, taskAttemptImpl.getInternalState());
        taskAttemptImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptImpl.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.APPLICATION_ERROR));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testVertexTaskAttemptInputFailure() throws Exception {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        startVertex(vertexImpl);
        this.dispatcher.await();
        TaskAttemptImpl taskAttemptImpl = (TaskAttemptImpl) vertexImpl.getTask(0).getAttempts().values().iterator().next();
        taskAttemptImpl.handle(new TaskAttemptEventSchedule(taskAttemptImpl.getID(), 2, 2));
        NodeId newInstance = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance2 = ContainerId.newInstance(this.appAttemptId, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance2);
        Mockito.when(container.getNodeId()).thenReturn(newInstance);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appContext);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appContext)).getAllContainers();
        taskAttemptImpl.handle(new TaskAttemptEventSubmitted(taskAttemptImpl.getID(), newInstance2));
        taskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptImpl.getID()));
        Assert.assertEquals(TaskAttemptStateInternal.RUNNING, taskAttemptImpl.getInternalState());
        taskAttemptImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptImpl.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.INPUT_READ_ERROR));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, taskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testVertexTaskAttemptOutputFailure() throws Exception {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        startVertex(vertexImpl);
        this.dispatcher.await();
        TaskAttemptImpl taskAttemptImpl = (TaskAttemptImpl) vertexImpl.getTask(0).getAttempts().values().iterator().next();
        taskAttemptImpl.handle(new TaskAttemptEventSchedule(taskAttemptImpl.getID(), 2, 2));
        NodeId newInstance = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newInstance2 = ContainerId.newInstance(this.appAttemptId, 3);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newInstance2);
        Mockito.when(container.getNodeId()).thenReturn(newInstance);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appContext);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appContext)).getAllContainers();
        taskAttemptImpl.handle(new TaskAttemptEventSubmitted(taskAttemptImpl.getID(), newInstance2));
        taskAttemptImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptImpl.getID()));
        Assert.assertEquals(TaskAttemptStateInternal.RUNNING, taskAttemptImpl.getInternalState());
        taskAttemptImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptImpl.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, taskAttemptImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testSourceVertexStartHandling() {
        LOG.info("Testing testSourceVertexStartHandling");
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex6");
        VertexImpl vertexImpl2 = this.vertices.get("vertex3");
        startVertex(this.vertices.get("vertex1"));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImpl2.getState());
        long j = vertexImpl2.startTimeRequested;
        Assert.assertEquals(1L, vertexImpl2.numStartedSourceVertices);
        Assert.assertTrue(j > 0);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        startVertex(this.vertices.get("vertex2"));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(2L, vertexImpl2.numStartedSourceVertices);
        Assert.assertTrue(vertexImpl2.startTimeRequested > j);
        LOG.info("Verifying v6 state " + vertexImpl.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(3L, vertexImpl.getDistanceFromRoot());
    }

    @Test(timeout = 5000)
    public void testSourceTaskAttemptCompletionEvents() {
        LOG.info("Testing testSourceTaskAttemptCompletionEvents");
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex4");
        VertexImpl vertexImpl2 = this.vertices.get("vertex5");
        VertexImpl vertexImpl3 = this.vertices.get("vertex6");
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        this.dispatcher.await();
        LOG.info("Verifying v6 state " + vertexImpl3.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        TezTaskID tezTaskID3 = TezTaskID.getInstance(vertexImpl2.getVertexId(), 0);
        TezTaskID tezTaskID4 = TezTaskID.getInstance(vertexImpl2.getVertexId(), 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(tezTaskID, 0);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(tezTaskID2, 0);
        TezTaskAttemptID tezTaskAttemptID4 = TezTaskAttemptID.getInstance(tezTaskID3, 0);
        TezTaskAttemptID tezTaskAttemptID5 = TezTaskAttemptID.getInstance(tezTaskID4, 0);
        TezTaskAttemptID tezTaskAttemptID6 = TezTaskAttemptID.getInstance(tezTaskID4, 0);
        vertexImpl.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID, TaskAttemptStateInternal.FAILED));
        vertexImpl.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID2, TaskAttemptStateInternal.SUCCEEDED));
        vertexImpl.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID3, TaskAttemptStateInternal.SUCCEEDED));
        vertexImpl2.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID4, TaskAttemptStateInternal.SUCCEEDED));
        vertexImpl2.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID5, TaskAttemptStateInternal.FAILED));
        vertexImpl2.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID6, TaskAttemptStateInternal.SUCCEEDED));
        vertexImpl.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        vertexImpl.handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        vertexImpl2.handle(new VertexEventTaskCompleted(tezTaskID3, TaskState.SUCCEEDED));
        vertexImpl2.handle(new VertexEventTaskCompleted(tezTaskID4, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertEquals(4L, vertexImpl3.numSuccessSourceAttemptCompletions);
    }

    @Test(timeout = 5000)
    public void testFailuresMaxPercentSourceTaskAttemptCompletionEvents() throws TezException {
        LOG.info("Testing testFailuresMaxPercentSourceTaskAttemptCompletionEvents");
        this.useCustomInitializer = false;
        this.customInitializer = null;
        setupPreDagCreation();
        this.conf.setFloat("tez.vertex.failures.maxpercent", 50.0f);
        this.conf.setInt("tez.am.task.max.failed.attempts", 1);
        this.dagPlan = createTestDAGPlan();
        setupPostDagCreation();
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex4");
        VertexImpl vertexImpl2 = this.vertices.get("vertex5");
        VertexImpl vertexImpl3 = this.vertices.get("vertex6");
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        this.dispatcher.await();
        LOG.info("Verifying v6 state " + vertexImpl3.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        TezTaskID tezTaskID3 = TezTaskID.getInstance(vertexImpl2.getVertexId(), 0);
        TezTaskID tezTaskID4 = TezTaskID.getInstance(vertexImpl2.getVertexId(), 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(tezTaskID2, 0);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(tezTaskID3, 0);
        TezTaskAttemptID tezTaskAttemptID4 = TezTaskAttemptID.getInstance(tezTaskID4, 0);
        TaskSpec taskSpec = new TaskSpec("dag", "vertex", 2, new ProcessorDescriptor(), new ArrayList(), new ArrayList(), (List) null, this.conf);
        TaskLocationHint createTaskLocationHint = TaskLocationHint.createTaskLocationHint((Set) null, (Set) null);
        this.dispatcher.getEventHandler().handle(new TaskEventScheduleTask(tezTaskID, taskSpec, createTaskLocationHint, false));
        this.dispatcher.getEventHandler().handle(new TaskEventScheduleTask(tezTaskID2, taskSpec, createTaskLocationHint, false));
        this.dispatcher.getEventHandler().handle(new TaskEventTAFailed(tezTaskAttemptID, TaskFailureType.NON_FATAL, (TezAbstractEvent) null));
        this.dispatcher.getEventHandler().handle(new TaskEventTASucceeded(tezTaskAttemptID2));
        this.dispatcher.getEventHandler().handle(new TaskEventTASucceeded(tezTaskAttemptID3));
        this.dispatcher.getEventHandler().handle(new TaskEventTAFailed(tezTaskAttemptID4, TaskFailureType.NON_FATAL, (TezAbstractEvent) null));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertEquals(4L, vertexImpl3.numSuccessSourceAttemptCompletions);
    }

    @Test(timeout = 5000)
    public void testFailuresMaxPercentExceededSourceTaskAttemptCompletionEvents() throws TezException {
        LOG.info("Testing testFailuresMaxPercentSourceTaskAttemptCompletionEvents");
        this.useCustomInitializer = false;
        this.customInitializer = null;
        setupPreDagCreation();
        this.conf.setFloat("tez.vertex.failures.maxpercent", 50.0f);
        this.conf.setInt("tez.am.task.max.failed.attempts", 1);
        this.dagPlan = createTestDAGPlan();
        setupPostDagCreation();
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex4");
        VertexImpl vertexImpl2 = this.vertices.get("vertex5");
        VertexImpl vertexImpl3 = this.vertices.get("vertex6");
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        this.dispatcher.await();
        LOG.info("Verifying v6 state " + vertexImpl3.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        TezTaskID tezTaskID3 = TezTaskID.getInstance(vertexImpl2.getVertexId(), 0);
        TezTaskID tezTaskID4 = TezTaskID.getInstance(vertexImpl2.getVertexId(), 1);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(tezTaskID2, 0);
        TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(tezTaskID3, 0);
        TezTaskAttemptID tezTaskAttemptID4 = TezTaskAttemptID.getInstance(tezTaskID4, 0);
        TaskSpec taskSpec = new TaskSpec("dag", "vertex", 2, new ProcessorDescriptor(), new ArrayList(), new ArrayList(), (List) null, this.conf);
        TaskLocationHint createTaskLocationHint = TaskLocationHint.createTaskLocationHint((Set) null, (Set) null);
        this.dispatcher.getEventHandler().handle(new TaskEventScheduleTask(tezTaskID, taskSpec, createTaskLocationHint, false));
        this.dispatcher.getEventHandler().handle(new TaskEventScheduleTask(tezTaskID2, taskSpec, createTaskLocationHint, false));
        this.dispatcher.getEventHandler().handle(new TaskEventTAFailed(tezTaskAttemptID, TaskFailureType.NON_FATAL, (TezAbstractEvent) null));
        this.dispatcher.getEventHandler().handle(new TaskEventTAFailed(tezTaskAttemptID2, TaskFailureType.NON_FATAL, (TezAbstractEvent) null));
        this.dispatcher.getEventHandler().handle(new TaskEventTASucceeded(tezTaskAttemptID3));
        this.dispatcher.getEventHandler().handle(new TaskEventTAFailed(tezTaskAttemptID4, TaskFailureType.NON_FATAL, (TezAbstractEvent) null));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertEquals(2L, vertexImpl3.numSuccessSourceAttemptCompletions);
    }

    @Test(timeout = 5000)
    public void testDAGEventGeneration() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(1L, this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
    }

    @Test(timeout = 5000)
    public void testTaskReschedule() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(tezTaskID));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
    }

    @Test(timeout = 5000)
    public void testTerminatingVertexForTaskComplete() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createSamplerDAGPlan(false);
        setupPostDagCreation();
        VertexImpl vertexImpl = (VertexImpl) Mockito.spy(this.vertices.get("A"));
        initVertex(vertexImpl);
        startVertex(vertexImpl);
        vertexImpl.handle(new VertexEventTermination(vertexImpl.getVertexId(), VertexTerminationCause.INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertTrue(vertexImpl.inTerminalState());
        Iterator it = vertexImpl.getDiagnostics().iterator();
        while (it.hasNext()) {
            if (((String) it.next()).contains("Invalid event")) {
                Assert.fail("Unexpected Invalid event transition!");
            }
        }
    }

    @Test(timeout = 5000)
    public void testTerminatingVertexForVComplete() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createSamplerDAGPlan(false);
        setupPostDagCreation();
        VertexImpl vertexImpl = (VertexImpl) Mockito.spy(this.vertices.get("A"));
        initVertex(vertexImpl);
        startVertex(vertexImpl);
        vertexImpl.handle(new VertexEventTermination(vertexImpl.getVertexId(), VertexTerminationCause.INTERNAL_ERROR));
        vertexImpl.handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_COMPLETED));
        this.dispatcher.await();
        Assert.assertTrue(vertexImpl.inTerminalState());
        Iterator it = vertexImpl.getDiagnostics().iterator();
        while (it.hasNext()) {
            if (((String) it.next()).contains("Invalid event")) {
                Assert.fail("Unexpected Invalid event transition!");
            }
        }
    }

    @Test(timeout = 5000)
    public void testVertexSuccessToRunningAfterTaskScheduler() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(1L, this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(tezTaskID));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(1L, this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_RERUNNING).intValue());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(2L, this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
    }

    @Test(timeout = 5000)
    public void testVertexSuccessToFailedAfterTaskScheduler() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig().toUserPayload())).build())).setName("output_v2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        vertexImpl.setAdditionalOutputs(arrayList);
        initAllVertices(VertexState.INITED);
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(1L, this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(tezTaskID));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(2L, this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
    }

    @Test(timeout = 5000)
    public void testVertexCommit() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex6");
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertexImpl.getOutputCommitter("outputx");
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
    }

    @Test(timeout = 5000)
    public void testTaskFailedAfterVertexSuccess() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex6");
        startVertex(this.vertices.get("vertex1"));
        startVertex(this.vertices.get("vertex2"));
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertexImpl.getOutputCommitter("outputx");
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(0L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
    }

    @Test(timeout = 5000)
    public void testBadCommitter() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(true, true, false).toUserPayload())).build())).setName("output_v2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        vertexImpl.setAdditionalOutputs(arrayList);
        initAllVertices(VertexState.INITED);
        startVertex(vertexImpl);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertexImpl.getOutputCommitter("output_v2");
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
    }

    @Test(timeout = 5000)
    public void testBadCommitter2() throws Exception {
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        ArrayList arrayList = new ArrayList();
        arrayList.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(true, true, true).toUserPayload())).build())).setName("output_v2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        vertexImpl.setAdditionalOutputs(arrayList);
        initAllVertices(VertexState.INITED);
        startVertex(vertexImpl);
        CountingOutputCommitter countingOutputCommitter = (CountingOutputCommitter) vertexImpl.getOutputCommitter("output_v2");
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(VertexTerminationCause.COMMIT_FAILURE, vertexImpl.getTerminationCause());
        Assert.assertEquals(1L, countingOutputCommitter.commitCounter);
        Assert.assertEquals(1L, countingOutputCommitter.abortCounter);
        Assert.assertEquals(1L, countingOutputCommitter.initCounter);
        Assert.assertEquals(1L, countingOutputCommitter.setupCounter);
    }

    @Test(timeout = 5000)
    public void testVertexInitWithCustomVertexManager() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createDAGWithCustomVertexManager();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("v1");
        VertexImpl vertexImpl2 = this.vertices.get("v2");
        VertexImpl vertexImpl3 = this.vertices.get("v3");
        initVertex(vertexImpl);
        initVertex(vertexImpl2);
        this.dispatcher.await();
        Assert.assertEquals(-1L, vertexImpl.getTotalTasks());
        Assert.assertTrue(0.0f == vertexImpl.getCompletedTaskProgress());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl.getState());
        Assert.assertEquals(-1L, vertexImpl2.getTotalTasks());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl2.getState());
        Assert.assertEquals(-1L, vertexImpl3.getTotalTasks());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(-1L, vertexImpl.getTotalTasks());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl.getState());
        vertexImpl.reconfigureVertex(3, (VertexLocationHint) null, (Map) null);
        vertexImpl2.reconfigureVertex(3, (VertexLocationHint) null, (Map) null);
        this.dispatcher.await();
        Assert.assertEquals(3, vertexImpl.getTotalTasks());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(3, vertexImpl2.getTotalTasks());
        Assert.assertEquals(VertexState.INITED, vertexImpl2.getState());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        Assert.assertTrue(vertexImpl3.numStartedSourceVertices > 0);
        long j = vertexImpl3.startTimeRequested;
        Assert.assertTrue(j > 0);
        vertexImpl3.reconfigureVertex(3, (VertexLocationHint) null, (Map) null);
        this.dispatcher.await();
        Assert.assertEquals(3, vertexImpl3.getTotalTasks());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertEquals(j, vertexImpl3.startTimeRequested);
    }

    @Test(timeout = 5000)
    public void testVertexManagerHeuristic() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithMixedEdges();
        setupPostDagCreation();
        initAllVertices(VertexState.INITED);
        Assert.assertEquals(ImmediateStartVertexManager.class, this.vertices.get("vertex1").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(ShuffleVertexManager.class, this.vertices.get("vertex2").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(InputReadyVertexManager.class, this.vertices.get("vertex3").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(ImmediateStartVertexManager.class, this.vertices.get("vertex4").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(ImmediateStartVertexManager.class, this.vertices.get("vertex5").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(InputReadyVertexManager.class, this.vertices.get("vertex6").getVertexManager().getPlugin().getClass());
    }

    @Test(timeout = 5000)
    public void testVertexWithOneToOneSplit() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanForOneToOneSplit("TestInputInitializer", -1, true);
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        VertexImpl vertexImpl = this.vertices.get("vertex5");
        initVertex(vertexImplWithControlledInitializerManager);
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        EdgeManagerPluginDescriptor create = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
        Edge edge = (Edge) vertexImpl.sourceVertices.get(vertexImplWithControlledInitializerManager);
        Assert.assertNull(edge.getEdgeManager());
        edge.setCustomEdgeManager(create);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(VertexState.INITED, this.vertices.get("vertex5").getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        List<TaskLocationHint> createTaskLocationHints = createTaskLocationHints(5);
        rootInputInitializerManager.completeInputInitialization(0, 5, createTaskLocationHints);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(5, vertexImplWithControlledInitializerManager.getTotalTasks());
        Assert.assertEquals(RootInputVertexManager.class.getName(), vertexImplWithControlledInitializerManager.getVertexManager().getPlugin().getClass().getName());
        for (int i = 0; i < createTaskLocationHints.size(); i++) {
            Assert.assertEquals(createTaskLocationHints.get(i), vertexImplWithControlledInitializerManager.getTaskLocationHints()[i]);
        }
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager.hasShutDown));
        startVertex(vertexImplWithControlledInitializerManager);
        Assert.assertEquals(5, this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals(5, this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals(5, this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals(VertexState.INITIALIZING, this.vertices.get("vertex4").getState());
        Assert.assertEquals(VertexState.INITIALIZING, this.vertices.get("vertex6").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex1").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex2").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex3").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex5").getState());
        Assert.assertEquals(VertexState.INITIALIZING, this.vertices.get("vertex4").getState());
        Assert.assertEquals(VertexState.INITIALIZING, this.vertices.get("vertex6").getState());
        EdgeManagerPluginDescriptor create2 = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
        Edge edge2 = (Edge) this.vertices.get("vertex6").sourceVertices.get(this.vertices.get("vertex4"));
        Assert.assertNull(edge2.getEdgeManager());
        edge2.setCustomEdgeManager(create2);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex4").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex6").getState());
    }

    @Test(timeout = 5000)
    public void testVertexWithOneToOneSplitWhileRunning() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanForOneToOneSplit(null, 5, false);
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        vertexImpl.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()), UserGroupInformation.getCurrentUser(), vertexImpl, this.appContext, (StateChangeNotifier) Mockito.mock(StateChangeNotifier.class));
        vertexImpl.vertexManager.initialize();
        startVertex(vertexImpl);
        this.dispatcher.await();
        Assert.assertEquals(5, this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals(5, this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals(5, this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex1").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex2").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex3").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex4").getState());
        vertexImpl.reconfigureVertex(3, (VertexLocationHint) null, (Map) null);
        vertexImpl.doneReconfiguringVertex();
        this.dispatcher.await();
        Assert.assertEquals(3, this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals(3, this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals(3, this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex1").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex2").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex3").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex4").getState());
    }

    @Test(timeout = 5000)
    public void testVertexWithOneToOneSplitWhileInited() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanForOneToOneSplit(null, 5, false);
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        vertexImpl.vertexReconfigurationPlanned();
        initAllVertices(VertexState.INITED);
        vertexImpl.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()), UserGroupInformation.getCurrentUser(), vertexImpl, this.appContext, (StateChangeNotifier) Mockito.mock(StateChangeNotifier.class));
        vertexImpl.vertexManager.initialize();
        Assert.assertEquals(5, this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals(5, this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals(5, this.vertices.get("vertex4").getTotalTasks());
        vertexImpl.reconfigureVertex(3, (VertexLocationHint) null, (Map) null);
        vertexImpl.doneReconfiguringVertex();
        this.dispatcher.await();
        Assert.assertEquals(3, this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals(3, this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals(3, this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals(VertexState.INITED, this.vertices.get("vertex1").getState());
        Assert.assertEquals(VertexState.INITED, this.vertices.get("vertex2").getState());
        Assert.assertEquals(VertexState.INITED, this.vertices.get("vertex3").getState());
        Assert.assertEquals(VertexState.INITED, this.vertices.get("vertex4").getState());
        startVertex(vertexImpl);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex1").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex2").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex3").getState());
        Assert.assertEquals(VertexState.RUNNING, this.vertices.get("vertex4").getState());
    }

    @Test(timeout = 5000)
    public void testVertexVMErrorReport() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanForOneToOneSplit(null, 5, false);
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        initAllVertices(VertexState.INITED);
        VertexManagerPluginForTest.VertexManagerPluginForTestConfig vertexManagerPluginForTestConfig = new VertexManagerPluginForTest.VertexManagerPluginForTestConfig();
        vertexManagerPluginForTestConfig.setReconfigureOnStart(true);
        vertexImpl.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()).setUserPayload(UserPayload.create(vertexManagerPluginForTestConfig.getPayload())), UserGroupInformation.getCurrentUser(), vertexImpl, this.appContext, (StateChangeNotifier) Mockito.mock(StateChangeNotifier.class));
        vertexImpl.vertexManager.initialize();
        startVertex(vertexImpl, false);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, this.vertices.get("vertex1").getState());
        Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, this.vertices.get("vertex1").getTerminationCause());
        Assert.assertTrue(Joiner.on(":").join(this.vertices.get("vertex1").getDiagnostics()).contains("context.vertexReconfigurationPlanned() before re-configuring"));
    }

    @Test(timeout = 5000)
    public void testInvalidEvent() {
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.ERROR, vertexImpl.getState());
        Assert.assertEquals(1L, this.dagEventDispatcher.eventCount.get(DAGEventType.INTERNAL_ERROR).intValue());
    }

    @Test(timeout = 5000)
    public void testVertexWithInitializerFailure() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        rootInputInitializerManager.failInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(RootInputVertexManager.class.getName(), vertexImplWithControlledInitializerManager.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager.hasShutDown));
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE.name()));
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains("MockInitializerFailed"));
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager2 = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex2");
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager2.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager2 = vertexImplWithControlledInitializerManager2.getRootInputInitializerManager();
        rootInputInitializerManager2.failInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager2.getState());
        Assert.assertEquals(RootInputVertexManager.class.getName(), vertexImplWithControlledInitializerManager2.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager2.hasShutDown));
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE.name()));
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains("MockInitializerFailed"));
    }

    @Test(timeout = 10000)
    public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new RootInitializerSettingParallelismTo0(null);
        RootInitializerSettingParallelismTo0 rootInitializerSettingParallelismTo0 = (RootInitializerSettingParallelismTo0) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithInitializer0Tasks(RootInitializerSettingParallelismTo0.class.getName());
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        VertexImpl vertexImpl2 = this.vertices.get("vertex2");
        initVertex(vertexImpl2);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl2.getVertexId(), 0);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
        TezEvent tezEvent = new TezEvent(DataMovementEvent.create((ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex1", tezTaskAttemptID));
        LinkedList linkedList = new LinkedList();
        linkedList.add(tezEvent);
        vertexImpl.handle(new VertexEventRouteEvent(vertexImpl.getVertexId(), linkedList));
        startVertex(vertexImpl2);
        this.dispatcher.await();
        vertexImpl2.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID, TaskAttemptStateInternal.SUCCEEDED));
        vertexImpl2.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl2.getState());
        while (true) {
            if (vertexImpl.getState() != VertexState.INITIALIZING && vertexImpl.getState() != VertexState.INITED) {
                break;
            }
            rootInitializerSettingParallelismTo0.go();
            Thread.sleep(10L);
        }
        while (vertexImpl.getState() != VertexState.SUCCEEDED) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
    }

    @Test(timeout = 10000)
    public void testInputInitializerVertexStateUpdates() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        eventHandlingRootInputInitializer.setNumVertexStateUpdateEvents(3);
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        initVertex(vertexImplWithRunningInputInitializer);
        startVertex(vertexImplWithRunningInputInitializer);
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Iterator it = vertexImplWithRunningInputInitializer.getTasks().keySet().iterator();
        while (it.hasNext()) {
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskCompleted((TezTaskID) it.next(), TaskState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImplWithRunningInputInitializer.getState());
        eventHandlingRootInputInitializer.waitForVertexStateUpdate();
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(0)).getVertexState());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(1)).getVertexState());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(2)).getVertexState());
    }

    @Test(timeout = 10000)
    public void testInputInitializerEventMultipleAttempts() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer4();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer3 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex3");
        initVertex(vertexImplWithRunningInputInitializer);
        startVertex(vertexImplWithRunningInputInitializer);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithRunningInputInitializer3.getState());
        InputInitializerEvent create = InputInitializerEvent.create("vertex3", "input1", ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0));
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImplWithRunningInputInitializer.getVertexId(), 0);
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer.getVertexId(), Collections.singletonList(new TezEvent(create, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", TezTaskAttemptID.getInstance(tezTaskID, 0))))));
        this.dispatcher.await();
        ByteBuffer putInt = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 1);
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer.getVertexId(), Collections.singletonList(new TezEvent(InputInitializerEvent.create("vertex3", "input1", putInt), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", TezTaskAttemptID.getInstance(tezTaskID, 1))))));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImplWithRunningInputInitializer3.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = vertexImplWithRunningInputInitializer3.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals(1L, initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals(2L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer.getName()).size());
        for (TezTaskID tezTaskID2 : vertexImplWithRunningInputInitializer.getTasks().keySet()) {
            TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID2, 1);
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID, TaskAttemptStateInternal.SUCCEEDED));
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
            this.dispatcher.await();
            vertexImplWithRunningInputInitializer.stateChangeNotifier.taskSucceeded(vertexImplWithRunningInputInitializer.getName(), tezTaskID2, tezTaskAttemptID.getId());
        }
        this.dispatcher.await();
        while (vertexImplWithRunningInputInitializer3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer3.getState());
        Assert.assertEquals(1L, eventHandlingRootInputInitializer.initializerEvents.size());
        Assert.assertEquals(putInt, ((InputInitializerEvent) eventHandlingRootInputInitializer.initializerEvents.get(0)).getUserPayload());
    }

    @Test(timeout = 10000)
    public void testInputInitializerEventsMultipleSources() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        eventHandlingRootInputInitializer.setNumExpectedEvents(4);
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer4();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer3 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex3");
        initVertex(vertexImplWithRunningInputInitializer);
        startVertex(vertexImplWithRunningInputInitializer);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithRunningInputInitializer3.getState());
        LinkedList linkedList = new LinkedList();
        ByteBuffer putInt = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
        linkedList.add(putInt);
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer.getVertexId(), Collections.singletonList(new TezEvent(InputInitializerEvent.create("vertex3", "input1", putInt), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithRunningInputInitializer.getVertexId(), 0), 0))))));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImplWithRunningInputInitializer3.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = vertexImplWithRunningInputInitializer3.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals(1L, initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals(1L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer.getName()).size());
        for (TezTaskID tezTaskID : vertexImplWithRunningInputInitializer.getTasks().keySet()) {
            TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID, TaskAttemptStateInternal.SUCCEEDED));
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
            this.dispatcher.await();
            vertexImplWithRunningInputInitializer.stateChangeNotifier.taskSucceeded(vertexImplWithRunningInputInitializer.getName(), tezTaskID, tezTaskAttemptID.getId());
        }
        this.dispatcher.await();
        Assert.assertEquals(1L, eventHandlingRootInputInitializer.initializerEvents.size());
        Assert.assertEquals(2L, vertexImplWithRunningInputInitializer2.getTotalTasks());
        Iterator it = vertexImplWithRunningInputInitializer2.getTasks().values().iterator();
        while (it.hasNext()) {
            TezTaskID taskId = ((Task) it.next()).getTaskId();
            TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(taskId, 0);
            int id = taskId.getId() + 1;
            for (int i = 0; i < id; i++) {
                ByteBuffer putInt2 = ByteBuffer.allocate(12).putInt(0, 2).putInt(4, taskId.getId()).putInt(8, i);
                linkedList.add(putInt2);
                this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer2.getVertexId(), Collections.singletonList(new TezEvent(InputInitializerEvent.create("vertex3", "input1", putInt2), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", tezTaskAttemptID2)))));
                this.dispatcher.await();
            }
        }
        Assert.assertEquals(1L, initializerWrapper.getPendingEvents().keySet().size());
        Assert.assertEquals(3L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer2.getName()).size());
        for (TezTaskID tezTaskID2 : vertexImplWithRunningInputInitializer2.getTasks().keySet()) {
            TezTaskAttemptID tezTaskAttemptID3 = TezTaskAttemptID.getInstance(tezTaskID2, 0);
            vertexImplWithRunningInputInitializer2.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID3, TaskAttemptStateInternal.SUCCEEDED));
            vertexImplWithRunningInputInitializer2.handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
            this.dispatcher.await();
            vertexImplWithRunningInputInitializer2.stateChangeNotifier.taskSucceeded(vertexImplWithRunningInputInitializer2.getName(), tezTaskID2, tezTaskAttemptID3.getId());
        }
        this.dispatcher.await();
        while (vertexImplWithRunningInputInitializer3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer3.getState());
        Assert.assertEquals(4L, eventHandlingRootInputInitializer.initializerEvents.size());
        Assert.assertTrue(eventHandlingRootInputInitializer.initComplete.get());
        Assert.assertEquals(2L, initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals(0L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer.getName()).size());
        Iterator it2 = eventHandlingRootInputInitializer.initializerEvents.iterator();
        while (it2.hasNext()) {
            linkedList.remove(((InputInitializerEvent) it2.next()).getUserPayload());
        }
        Assert.assertEquals(0L, linkedList.size());
    }

    @Test(timeout = 10000)
    public void testInputInitializerEventNoDirectConnection() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer4();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer3 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex3");
        initVertex(vertexImplWithRunningInputInitializer);
        startVertex(vertexImplWithRunningInputInitializer);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithRunningInputInitializer3.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer.getVertexId(), Collections.singletonList(new TezEvent(InputInitializerEvent.create("vertex3", "input1", (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithRunningInputInitializer.getVertexId(), 0), 0))))));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImplWithRunningInputInitializer3.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = vertexImplWithRunningInputInitializer3.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals(1L, initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals(1L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer.getName()).size());
        for (TezTaskID tezTaskID : vertexImplWithRunningInputInitializer.getTasks().keySet()) {
            TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID, TaskAttemptStateInternal.SUCCEEDED));
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
            this.dispatcher.await();
            vertexImplWithRunningInputInitializer.stateChangeNotifier.taskSucceeded(vertexImplWithRunningInputInitializer.getName(), tezTaskID, tezTaskAttemptID.getId());
        }
        this.dispatcher.await();
        while (vertexImplWithRunningInputInitializer3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer3.getState());
        Assert.assertEquals(1L, initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals(0L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer.getName()).size());
        Assert.assertTrue(eventHandlingRootInputInitializer.eventReceived.get());
        Assert.assertEquals(3L, eventHandlingRootInputInitializer.stateUpdates.size());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(0)).getVertexState());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(1)).getVertexState());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(2)).getVertexState());
    }

    @Test(timeout = 10000)
    public void testInputInitializerEventsAtNew() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer3();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer3 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex3");
        initVertex(vertexImplWithRunningInputInitializer);
        startVertex(vertexImplWithRunningInputInitializer);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.NEW, vertexImplWithRunningInputInitializer2.getState());
        Assert.assertEquals(VertexState.NEW, vertexImplWithRunningInputInitializer3.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer.getVertexId(), Collections.singletonList(new TezEvent(InputInitializerEvent.create("vertex3", "input1", (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithRunningInputInitializer.getVertexId(), 0), 0))))));
        this.dispatcher.await();
        Assert.assertEquals(1L, vertexImplWithRunningInputInitializer3.pendingInitializerEvents.size());
        for (TezTaskID tezTaskID : vertexImplWithRunningInputInitializer.getTasks().keySet()) {
            TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
            TaskImpl task = vertexImplWithRunningInputInitializer.getTask(tezTaskID);
            task.handle(new TaskEvent(tezTaskID, TaskEventType.T_ATTEMPT_LAUNCHED));
            task.handle(new TaskEventTASucceeded(tezTaskAttemptID));
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID, TaskAttemptStateInternal.SUCCEEDED));
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
            this.dispatcher.await();
            vertexImplWithRunningInputInitializer.stateChangeNotifier.taskSucceeded(vertexImplWithRunningInputInitializer.getName(), tezTaskID, tezTaskAttemptID.getId());
        }
        this.dispatcher.await();
        Assert.assertEquals(1L, vertexImplWithRunningInputInitializer3.pendingInitializerEvents.size());
        Assert.assertEquals(VertexState.NEW, vertexImplWithRunningInputInitializer3.getState());
        initVertex(vertexImplWithRunningInputInitializer2);
        startVertex(vertexImplWithRunningInputInitializer2);
        this.dispatcher.await();
        while (vertexImplWithRunningInputInitializer3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer3.getState());
        Assert.assertEquals(0L, vertexImplWithRunningInputInitializer3.pendingInitializerEvents.size());
        Assert.assertTrue(eventHandlingRootInputInitializer.eventReceived.get());
        Assert.assertEquals(3L, eventHandlingRootInputInitializer.stateUpdates.size());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(0)).getVertexState());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.RUNNING, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(1)).getVertexState());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.SUCCEEDED, ((VertexStateUpdate) eventHandlingRootInputInitializer.stateUpdates.get(2)).getVertexState());
    }

    @Test(timeout = 10000)
    public void testInputInitializerEvents() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        initVertex(vertexImplWithRunningInputInitializer);
        startVertex(vertexImplWithRunningInputInitializer);
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithRunningInputInitializer2.getState());
        this.dispatcher.await();
        while (!eventHandlingRootInputInitializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        Assert.assertFalse(eventHandlingRootInputInitializer.eventReceived.get());
        Assert.assertFalse(eventHandlingRootInputInitializer.initComplete.get());
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer.getVertexId(), Collections.singletonList(new TezEvent(InputInitializerEvent.create("vertex2", "input1", (ByteBuffer) null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithRunningInputInitializer.getVertexId(), 0), 0))))));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImplWithRunningInputInitializer2.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = vertexImplWithRunningInputInitializer2.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals(1L, initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals(1L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer.getName()).size());
        for (TezTaskID tezTaskID : vertexImplWithRunningInputInitializer.getTasks().keySet()) {
            TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskAttemptCompleted(tezTaskAttemptID, TaskAttemptStateInternal.SUCCEEDED));
            vertexImplWithRunningInputInitializer.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
            this.dispatcher.await();
            vertexImplWithRunningInputInitializer.stateChangeNotifier.taskSucceeded(vertexImplWithRunningInputInitializer.getName(), tezTaskID, tezTaskAttemptID.getId());
        }
        while (!eventHandlingRootInputInitializer.eventReceived.get()) {
            Thread.sleep(10L);
        }
        while (!eventHandlingRootInputInitializer.initComplete.get()) {
            Thread.sleep(10L);
        }
        while (vertexImplWithRunningInputInitializer2.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(1L, initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals(0L, initializerWrapper.getPendingEvents().get(vertexImplWithRunningInputInitializer.getName()).size());
    }

    @Test(timeout = 5000)
    public void testTaskSchedulingWithCustomEdges() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createCustomDAGWithCustomEdges();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("M2");
        VertexImpl vertexImpl2 = this.vertices.get("M7");
        VertexImpl vertexImpl3 = this.vertices.get("R3");
        VertexImpl vertexImpl4 = this.vertices.get("M5");
        VertexImpl vertexImpl5 = this.vertices.get("M8");
        VertexImpl vertexImpl6 = this.vertices.get("M9");
        initVertex(vertexImpl);
        initVertex(vertexImpl2);
        initVertex(vertexImpl5);
        initVertex(vertexImpl6);
        Assert.assertTrue(vertexImpl2.getState().equals(VertexState.INITED));
        Assert.assertTrue(vertexImpl6.getState().equals(VertexState.INITED));
        Assert.assertTrue(vertexImpl4.getState().equals(VertexState.INITED));
        Assert.assertTrue(vertexImpl5.getState().equals(VertexState.INITED));
        Assert.assertTrue(vertexImpl4.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager);
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl6.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertTrue(vertexImpl3.getState().equals(VertexState.RUNNING));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertTrue(vertexImpl4.getState().equals(VertexState.INITED));
        Iterator it = vertexImpl4.getTasks().values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((Task) it.next()).getState().equals(TaskState.NEW));
        }
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl5.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertTrue(vertexImpl6.getState().equals(VertexState.SUCCEEDED));
        Assert.assertTrue(vertexImpl4.getState().equals(VertexState.RUNNING));
        Iterator it2 = vertexImpl4.getTasks().values().iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((Task) it2.next()).getState().equals(TaskState.SCHEDULED));
        }
    }

    private DAGProtos.DAGPlan createCustomDAGWithCustomEdges() {
        return DAGProtos.DAGPlan.newBuilder().setName("TestSamplerDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("M2").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M2.class").build()).addOutEdgeId("M2_R3").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M8").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M8.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M8.class").build()).addOutEdgeId("M8_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M9").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M9.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(0).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M9.class").build()).addOutEdgeId("M9_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("R3").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(10).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("R3.class").build()).addInEdgeId("M2_R3").addOutEdgeId("R3_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M5").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M5.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(10).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M5.class").build()).addInEdgeId("R3_M5").addInEdgeId("M7_M5").addInEdgeId("M8_M5").addInEdgeId("M9_M5").addOutEdgeId("M5_R6").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M7").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M7.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(10).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M7.class").build()).addOutEdgeId("M7_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("R6").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("R6.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("R6.class").build()).addInEdgeId("M5_R6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2_R3")).setInputVertexName("M2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2_R3.class")).setOutputVertexName("R3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("M2_R3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("R3_M5")).setInputVertexName("R3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("R3_M5.class")).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("R3_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M7_M5")).setInputVertexName("M7").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M7_M5.class")).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("M7_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M5_R6")).setInputVertexName("M5").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M5_R6.class")).setOutputVertexName("R6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("M5_R6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M9_M5")).setInputVertexName("M9").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M9_M5.class")).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("M9_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M8_M5")).setInputVertexName("M8").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M8_M5.class")).setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(EdgeManagerForTest.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom(this.edgePayload))).build()).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("M8_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
    }

    @Test(timeout = 5000)
    public void testVertexWithMultipleInitializers1() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        rootInputInitializerManager.completeInputInitialization(0, 5, createTaskLocationHints(5));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(1L, vertexImplWithControlledInitializerManager.numInitializerCompletionsHandled);
        rootInputInitializerManager.completeInputInitialization(1);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(2L, vertexImplWithControlledInitializerManager.numInitializerCompletionsHandled);
    }

    @Test(timeout = 5000)
    public void testVertexWithMultipleInitializers2() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithMultipleInitializers("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        List<TaskLocationHint> createTaskLocationHints = createTaskLocationHints(5);
        rootInputInitializerManager.completeInputInitialization(1);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(1L, vertexImplWithControlledInitializerManager.numInitializerCompletionsHandled);
        rootInputInitializerManager.completeInputInitialization(0, 5, createTaskLocationHints);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(2L, vertexImplWithControlledInitializerManager.numInitializerCompletionsHandled);
    }

    @Test(timeout = 500000)
    public void testVertexWithInitializerSuccess() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        List<TaskLocationHint> createTaskLocationHints = createTaskLocationHints(5);
        rootInputInitializerManager.completeInputInitialization(0, 5, createTaskLocationHints);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(5L, vertexImplWithControlledInitializerManager.getTotalTasks());
        Assert.assertEquals(RootInputVertexManager.class.getName(), vertexImplWithControlledInitializerManager.getVertexManager().getPlugin().getClass().getName());
        for (int i = 0; i < createTaskLocationHints.size(); i++) {
            Assert.assertEquals(createTaskLocationHints.get(i), vertexImplWithControlledInitializerManager.getTaskLocationHints()[i]);
        }
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager.hasShutDown));
        for (int i2 = 0; i2 < 5; i2++) {
            List inputSpecList = vertexImplWithControlledInitializerManager.getInputSpecList(i2);
            Assert.assertEquals(1L, inputSpecList.size());
            Assert.assertEquals(1L, ((InputSpec) inputSpecList.get(0)).getPhysicalEdgeCount());
        }
        LinkedList linkedList = new LinkedList();
        for (int i3 = 0; i3 < vertexImplWithControlledInitializerManager.getTotalTasks(); i3++) {
            linkedList.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i3, (TaskLocationHint) null));
        }
        vertexImplWithControlledInitializerManager.scheduleTasks(linkedList);
        this.dispatcher.await();
        for (int i4 = 0; i4 < vertexImplWithControlledInitializerManager.getTotalTasks(); i4++) {
            Assert.assertEquals(1L, vertexImplWithControlledInitializerManager.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(vertexImplWithControlledInitializerManager.getTask(i4).getTaskId(), 0), 0, 0, 100).getEvents().size());
        }
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager2 = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex2");
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager2.getState());
        LinkedList newLinkedList = Lists.newLinkedList();
        newLinkedList.add(new TezEvent(VertexManagerEvent.create("vertex2", ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2", TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithControlledInitializerManager.getVertexId(), 0), 0))));
        newLinkedList.add(new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "NULL_VERTEX", (TezTaskAttemptID) null)));
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithControlledInitializerManager2.getVertexId(), newLinkedList));
        this.dispatcher.await();
        RootInputInitializerManagerControlled rootInputInitializerManager2 = vertexImplWithControlledInitializerManager2.getRootInputInitializerManager();
        List<TaskLocationHint> createTaskLocationHints2 = createTaskLocationHints(10);
        rootInputInitializerManager2.completeInputInitialization(0, 10, createTaskLocationHints2);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager2.getState());
        Assert.assertEquals(10L, vertexImplWithControlledInitializerManager2.getTotalTasks());
        Assert.assertEquals(RootInputVertexManager.class.getName(), vertexImplWithControlledInitializerManager2.getVertexManager().getPlugin().getClass().getName());
        for (int i5 = 0; i5 < createTaskLocationHints2.size(); i5++) {
            Assert.assertEquals(createTaskLocationHints2.get(i5), vertexImplWithControlledInitializerManager2.getTaskLocationHints()[i5]);
        }
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager2.hasShutDown));
        LinkedList linkedList2 = new LinkedList();
        for (int i6 = 0; i6 < vertexImplWithControlledInitializerManager2.getTotalTasks(); i6++) {
            linkedList2.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i6, (TaskLocationHint) null));
        }
        vertexImplWithControlledInitializerManager2.scheduleTasks(linkedList2);
        this.dispatcher.await();
        int i7 = 0;
        while (i7 < vertexImplWithControlledInitializerManager2.getTotalTasks()) {
            Assert.assertEquals(i7 == 0 ? 2 : 1, vertexImplWithControlledInitializerManager2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance(vertexImplWithControlledInitializerManager2.getTask(i7).getTaskId(), 0), 0, 0, 100).getEvents().size());
            i7++;
        }
        for (int i8 = 0; i8 < 10; i8++) {
            List inputSpecList2 = vertexImplWithControlledInitializerManager.getInputSpecList(i8);
            Assert.assertEquals(1L, inputSpecList2.size());
            Assert.assertEquals(1L, ((InputSpec) inputSpecList2.get(0)).getPhysicalEdgeCount());
        }
    }

    @Test(timeout = 5000)
    public void testVertexWithInitializerSuccessLegacyRouting() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        List<TaskLocationHint> createTaskLocationHints = createTaskLocationHints(5);
        rootInputInitializerManager.completeInputInitialization(0, 5, createTaskLocationHints);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(5L, vertexImplWithControlledInitializerManager.getTotalTasks());
        Assert.assertEquals(RootInputVertexManager.class.getName(), vertexImplWithControlledInitializerManager.getVertexManager().getPlugin().getClass().getName());
        for (int i = 0; i < createTaskLocationHints.size(); i++) {
            Assert.assertEquals(createTaskLocationHints.get(i), vertexImplWithControlledInitializerManager.getTaskLocationHints()[i]);
        }
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager.hasShutDown));
        for (int i2 = 0; i2 < 5; i2++) {
            List inputSpecList = vertexImplWithControlledInitializerManager.getInputSpecList(i2);
            Assert.assertEquals(1L, inputSpecList.size());
            Assert.assertEquals(1L, ((InputSpec) inputSpecList.get(0)).getPhysicalEdgeCount());
        }
        Assert.assertEquals(5L, vertexImplWithControlledInitializerManager.pendingTaskEvents.size());
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager2 = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex2");
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager2.getState());
        LinkedList newLinkedList = Lists.newLinkedList();
        newLinkedList.add(new TezEvent(VertexManagerEvent.create("vertex2", ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2", TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithControlledInitializerManager.getVertexId(), 0), 0))));
        newLinkedList.add(new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "NULL_VERTEX", (TezTaskAttemptID) null)));
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithControlledInitializerManager2.getVertexId(), newLinkedList));
        this.dispatcher.await();
        Assert.assertEquals(1L, vertexImplWithControlledInitializerManager2.pendingTaskEvents.size());
        RootInputInitializerManagerControlled rootInputInitializerManager2 = vertexImplWithControlledInitializerManager2.getRootInputInitializerManager();
        List<TaskLocationHint> createTaskLocationHints2 = createTaskLocationHints(10);
        rootInputInitializerManager2.completeInputInitialization(0, 10, createTaskLocationHints2);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager2.getState());
        Assert.assertEquals(10L, vertexImplWithControlledInitializerManager2.getTotalTasks());
        Assert.assertEquals(RootInputVertexManager.class.getName(), vertexImplWithControlledInitializerManager2.getVertexManager().getPlugin().getClass().getName());
        for (int i3 = 0; i3 < createTaskLocationHints2.size(); i3++) {
            Assert.assertEquals(createTaskLocationHints2.get(i3), vertexImplWithControlledInitializerManager2.getTaskLocationHints()[i3]);
        }
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager2.hasShutDown));
        Assert.assertEquals(11L, vertexImplWithControlledInitializerManager2.pendingTaskEvents.size());
        for (int i4 = 0; i4 < 10; i4++) {
            List inputSpecList2 = vertexImplWithControlledInitializerManager.getInputSpecList(i4);
            Assert.assertEquals(1L, inputSpecList2.size());
            Assert.assertEquals(1L, ((InputSpec) inputSpecList2.get(0)).getPhysicalEdgeCount());
        }
    }

    @Test(timeout = 5000)
    public void testVertexWithInputDistributor() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithInputDistributor("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager2 = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager2.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        byte[] bArr = new byte[0];
        rootInputInitializerManager.completeInputDistribution(bArr);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager.hasShutDown));
        Assert.assertEquals(2L, vertexImplWithControlledInitializerManager.getTotalTasks());
        Assert.assertArrayEquals(bArr, ((InputSpec) vertexImplWithControlledInitializerManager.getInputSpecList(0).get(0)).getInputDescriptor().getUserPayload().deepCopyAsArray());
        EdgeManagerPluginDescriptor create = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
        Edge edge = (Edge) vertexImplWithControlledInitializerManager2.sourceVertices.get(vertexImplWithControlledInitializerManager);
        Assert.assertNull(edge.getEdgeManager());
        edge.setCustomEdgeManager(create);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager2.getState());
    }

    @Test(timeout = 5000)
    public void testVertexRootInputSpecUpdateAll() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex3");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        rootInputInitializerManager.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(5, vertexImplWithControlledInitializerManager.getTotalTasks());
        Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), vertexImplWithControlledInitializerManager.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager.hasShutDown));
        for (int i = 0; i < 5; i++) {
            List inputSpecList = vertexImplWithControlledInitializerManager.getInputSpecList(i);
            Assert.assertEquals(1L, inputSpecList.size());
            Assert.assertEquals(4L, ((InputSpec) inputSpecList.get(0)).getPhysicalEdgeCount());
        }
    }

    @Test(timeout = 5000)
    public void testVertexRootInputSpecUpdatePerTask() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithInputInitializer("TestInputInitializer");
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex4");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithControlledInitializerManager.getState());
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        rootInputInitializerManager.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(5, vertexImplWithControlledInitializerManager.getTotalTasks());
        Assert.assertEquals(RootInputSpecUpdaterVertexManager.class.getName(), vertexImplWithControlledInitializerManager.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals(true, Boolean.valueOf(rootInputInitializerManager.hasShutDown));
        for (int i = 0; i < 5; i++) {
            List inputSpecList = vertexImplWithControlledInitializerManager.getInputSpecList(i);
            Assert.assertEquals(1L, inputSpecList.size());
            Assert.assertEquals(i + 1, ((InputSpec) inputSpecList.get(0)).getPhysicalEdgeCount());
        }
    }

    private List<TaskLocationHint> createTaskLocationHints(int i) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(i);
        for (int i2 = 0; i2 < i; i2++) {
            newArrayListWithCapacity.add(TaskLocationHint.createTaskLocationHint(Sets.newSet(new String[]{"host" + i2}), (Set) null));
        }
        return newArrayListWithCapacity;
    }

    @Test(timeout = 5000)
    public void testVertexWithNoTasks() {
        TezVertexID tezVertexID = null;
        try {
            tezVertexID = TezVertexID.getInstance(TezDAGID.getInstance(this.dagId.getApplicationId(), 1000), 1);
            DAGProtos.VertexPlan vertex = this.invalidDagPlan.getVertex(0);
            VertexImpl vertexImpl = new VertexImpl(tezVertexID, vertex, vertex.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, true, this.appContext, this.vertexLocationHint, (Map) null, taskSpecificLaunchCmdOption, this.updateTracker, new Configuration(false));
            vertexImpl.setInputVertices(new HashMap());
            this.vertexIdMap.put(tezVertexID, vertexImpl);
            this.vertices.put(vertexImpl.getName(), vertexImpl);
            vertexImpl.handle(new VertexEvent(tezVertexID, VertexEventType.V_INIT));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.INITED, vertexImpl.getState());
            Assert.assertTrue(0.0f == vertexImpl.getCompletedTaskProgress());
            vertexImpl.handle(new VertexEvent(tezVertexID, VertexEventType.V_START));
            this.dispatcher.await();
            Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
            Assert.assertTrue(1.0f == vertexImpl.getCompletedTaskProgress());
            if (tezVertexID != null) {
                this.vertexIdMap.remove(tezVertexID);
            }
        } catch (Throwable th) {
            if (tezVertexID != null) {
                this.vertexIdMap.remove(tezVertexID);
            }
            throw th;
        }
    }

    @Test(timeout = 5000)
    public void testVertexGroupInput() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createVertexGroupDAGPlan();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("A");
        VertexImpl vertexImpl2 = this.vertices.get("B");
        VertexImpl vertexImpl3 = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertNull(vertexImpl.getGroupInputSpecList());
        Assert.assertNull(vertexImpl2.getGroupInputSpecList());
        List groupInputSpecList = vertexImpl3.getGroupInputSpecList();
        Assert.assertEquals(1L, groupInputSpecList.size());
        Assert.assertEquals("Group", ((GroupInputSpec) groupInputSpecList.get(0)).getGroupName());
        Assert.assertTrue(((GroupInputSpec) groupInputSpecList.get(0)).getGroupVertices().contains("A"));
        Assert.assertTrue(((GroupInputSpec) groupInputSpecList.get(0)).getGroupVertices().contains("B"));
        ((GroupInputSpec) groupInputSpecList.get(0)).getMergedInputDescriptor().getClassName().equals("Group.class");
    }

    @Test(timeout = 5000)
    public void testStartWithUninitializedCustomEdge() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createSamplerDAGPlan(true);
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("A");
        VertexImpl vertexImpl2 = this.vertices.get("B");
        VertexImpl vertexImpl3 = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        EdgeManagerPluginDescriptor create = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
        Edge edge = (Edge) vertexImpl3.sourceVertices.get(vertexImpl);
        Assert.assertNull(edge.getEdgeManager());
        edge.setCustomEdgeManager(create);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        EdgeProperty create2 = EdgeProperty.create(create, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("B", create2);
        vertexImpl3.reconfigureVertex(2, this.vertexLocationHint, newHashMap);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertNotNull(vertexImpl.getTask(0));
        Assert.assertNotNull(vertexImpl2.getTask(0));
        Assert.assertNotNull(vertexImpl3.getTask(0));
    }

    @Test(timeout = 5000)
    public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception {
        setupPreDagCreation();
        this.dagPlan = createSamplerDAGPlan(true);
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("A");
        VertexImpl vertexImpl2 = this.vertices.get("B");
        VertexImpl vertexImpl3 = this.vertices.get("C");
        TestUpdateListener testUpdateListener = new TestUpdateListener();
        this.updateTracker.registerForVertexUpdates(vertexImpl2.getName(), EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED), testUpdateListener);
        vertexImpl2.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create(VertexManagerPluginForTest.class.getName()), UserGroupInformation.getCurrentUser(), vertexImpl2, this.appContext, (StateChangeNotifier) Mockito.mock(StateChangeNotifier.class));
        vertexImpl2.vertexReconfigurationPlanned();
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        EdgeManagerPluginDescriptor create = EdgeManagerPluginDescriptor.create(EdgeManagerForTest.class.getName());
        Edge edge = (Edge) vertexImpl3.sourceVertices.get(vertexImpl);
        Assert.assertNull(edge.getEdgeManager());
        edge.setCustomEdgeManager(create);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        vertexImpl2.doneReconfiguringVertex();
        Assert.assertEquals(0L, testUpdateListener.events.size());
        EdgeProperty create2 = EdgeProperty.create(create, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In"));
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("B", create2);
        vertexImpl3.reconfigureVertex(2, this.vertexLocationHint, newHashMap);
        this.dispatcher.await();
        Assert.assertEquals(1L, testUpdateListener.events.size());
        Assert.assertEquals(vertexImpl2.getName(), testUpdateListener.events.get(0).getVertexName());
        Assert.assertEquals(org.apache.tez.dag.api.event.VertexState.CONFIGURED, testUpdateListener.events.get(0).getVertexState());
        this.updateTracker.unregisterForVertexUpdates(vertexImpl2.getName(), testUpdateListener);
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
        Assert.assertNotNull(vertexImpl.getTask(0));
        Assert.assertNotNull(vertexImpl2.getTask(0));
        Assert.assertNotNull(vertexImpl3.getTask(0));
    }

    @Test(timeout = 5000)
    public void testInitStartRace() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createSamplerDAGPlan(false);
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("A");
        VertexImpl vertexImpl2 = this.vertices.get("B");
        VertexImpl vertexImpl3 = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
    }

    @Test(timeout = 5000)
    public void testInitStartRace2() throws TezException {
        setupPreDagCreation();
        this.dagPlan = createSamplerDAGPlan2();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("A");
        VertexImpl vertexImpl2 = this.vertices.get("B");
        VertexImpl vertexImpl3 = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl2.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
    }

    @Test(timeout = 5000)
    public void testTez2684() throws IOException, TezException {
        setupPreDagCreation();
        this.dagPlan = createSamplerDAGPlan2();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("A");
        VertexImpl vertexImpl2 = this.vertices.get("B");
        VertexImpl vertexImpl3 = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.NEW, vertexImpl2.getState());
        Assert.assertEquals(VertexState.NEW, vertexImpl3.getState());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImpl2.getState());
        Assert.assertEquals(VertexState.INITED, vertexImpl3.getState());
        sendTaskGeneratedEvent(getVertexManagerEvent(new long[]{100000000}, 1060000000L, vertexImpl2), EventMetaData.EventProducerConsumerType.INPUT, vertexImpl3, vertexImpl2);
        Assert.assertEquals(VertexState.INITED, vertexImpl3.getState());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
    }

    @Test(timeout = 5000)
    public void testVertexGraceParallelism() throws IOException, TezException {
        setupPreDagCreation();
        this.dagPlan = createDAGPlanForGraceParallelism();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("A");
        VertexImpl vertexImpl2 = this.vertices.get("B");
        VertexImpl vertexImpl3 = this.vertices.get("C");
        initVertex(vertexImpl);
        Assert.assertEquals(VertexState.INITED, vertexImpl.getState());
        Assert.assertEquals(VertexState.INITED, vertexImpl2.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        sendTaskGeneratedEvent(getVertexManagerEvent(new long[]{100000000}, 1060000000L, vertexImpl3), EventMetaData.EventProducerConsumerType.OUTPUT, vertexImpl2, vertexImpl3);
        Assert.assertEquals(VertexState.INITIALIZING, vertexImpl3.getState());
        startVertex(vertexImpl);
        completeAllTasksSuccessfully(vertexImpl);
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertEquals(VertexState.RUNNING, vertexImpl3.getState());
    }

    private void sendTaskGeneratedEvent(Event event, EventMetaData.EventProducerConsumerType eventProducerConsumerType, Vertex vertex, Vertex vertex2) {
        sendVertexEventRouteEvent(vertex, new TezEvent(event, new EventMetaData(eventProducerConsumerType, vertex.getName(), vertex2.getName(), TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertex.getVertexId(), 1), 1))));
    }

    private void sendVertexEventRouteEvent(Vertex vertex, TezEvent... tezEventArr) {
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertex.getVertexId(), Arrays.asList(tezEventArr)));
        this.dispatcher.await();
    }

    private VertexManagerEvent getVertexManagerEvent(long[] jArr, long j, Vertex vertex) throws IOException {
        ByteBuffer asReadOnlyByteBuffer;
        if (jArr != null) {
            RoaringBitmap partitionStatsForPhysicalOutput = ShuffleUtils.getPartitionStatsForPhysicalOutput(jArr);
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            partitionStatsForPhysicalOutput.serialize(dataOutputBuffer);
            asReadOnlyByteBuffer = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(j).setPartitionStats(TezCommonUtils.compressByteArrayToByteString(dataOutputBuffer.getData())).build().toByteString().asReadOnlyByteBuffer();
        } else {
            asReadOnlyByteBuffer = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(j).build().toByteString().asReadOnlyByteBuffer();
        }
        return VertexManagerEvent.create(vertex.getName(), asReadOnlyByteBuffer);
    }

    @Test(timeout = 5000)
    public void testVMEventBeforeVertexInitialized() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithCountingVM();
        setupPostDagCreation();
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        VertexImpl vertexImpl2 = this.vertices.get("vertex2");
        VertexImpl vertexImpl3 = this.vertices.get("vertex3");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImpl.getState());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(VertexState.NEW, vertexImpl3.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImpl.getVertexId(), Collections.singletonList(new TezEvent(VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", (String) null, TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl.getVertexId(), 1), 1))))));
        this.dispatcher.await();
        Assert.assertEquals(1L, vertexImpl3.pendingVmEvents.size());
        Assert.assertEquals(0L, InvocationCountingVertexManager.numVmEventsReceived.get());
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImpl2.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImpl3.getState());
        Assert.assertEquals(0L, vertexImpl3.pendingVmEvents.size());
        Assert.assertEquals(1L, InvocationCountingVertexManager.numVmEventsReceived.get());
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImpl.getVertexId(), Collections.singletonList(new TezEvent(VertexManagerEvent.create("vertex3", ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", (String) null, TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImpl.getVertexId(), 1), 2))))));
        this.dispatcher.await();
        Assert.assertEquals(0L, vertexImpl3.pendingVmEvents.size());
        Assert.assertEquals(2L, InvocationCountingVertexManager.numVmEventsReceived.get());
    }

    @Test(timeout = 5000)
    public void testExceptionFromVM_Initialize() throws TezException {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.Initialize);
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains(VertexManagerWithException.VMExceptionLocation.Initialize.name()));
        Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertexImplWithControlledInitializerManager.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromVM_OnRootVertexInitialized() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnRootVertexInitialized);
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        RootInputInitializerManagerControlled rootInputInitializerManager = vertexImplWithControlledInitializerManager.getRootInputInitializerManager();
        rootInputInitializerManager.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexManagerWithException.class, vertexImplWithControlledInitializerManager.vertexManager.getPlugin().getClass());
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertTrue(rootInputInitializerManager.hasShutDown);
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains(VertexManagerWithException.VMExceptionLocation.OnRootVertexInitialized.name()));
        Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertexImplWithControlledInitializerManager.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromVM_OnVertexStarted() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnVertexStarted);
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        vertexImplWithControlledInitializerManager.getRootInputInitializerManager().completeInputInitialization();
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexManagerWithException.class, vertexImplWithControlledInitializerManager.vertexManager.getPlugin().getClass());
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains(VertexManagerWithException.VMExceptionLocation.OnVertexStarted.name()));
        Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertexImplWithControlledInitializerManager.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnSourceTaskCompleted);
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        vertexImplWithControlledInitializerManager.getRootInputInitializerManager().completeInputInitialization();
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        vertexImplWithControlledInitializerManager.getEventHandler().handle(new VertexEventTaskAttemptCompleted(TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithControlledInitializerManager.getVertexId(), 0), 0), TaskAttemptStateInternal.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexManagerWithException.class, vertexImplWithControlledInitializerManager.vertexManager.getPlugin().getClass());
        Assert.assertEquals(VertexManagerWithException.class, vertexImpl.vertexManager.getPlugin().getClass());
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertTrue(StringUtils.join(vertexImpl.getDiagnostics(), ",").contains(VertexManagerWithException.VMExceptionLocation.OnSourceTaskCompleted.name()));
        Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertexImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromVM_OnVertexManagerEventReceived() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnVertexManagerEventReceived);
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        vertexImplWithControlledInitializerManager.getRootInputInitializerManager().completeInputInitialization();
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithControlledInitializerManager.getVertexId(), Lists.newArrayList(new TezEvent[]{new TezEvent(VertexManagerEvent.create(vertexImplWithControlledInitializerManager.getName(), ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, vertexImplWithControlledInitializerManager.getName(), (String) null, TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexImplWithControlledInitializerManager.getVertexId(), 0), 0)))})));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains(VertexManagerWithException.VMExceptionLocation.OnVertexManagerEventReceived.name()));
    }

    @Test(timeout = 5000)
    public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithVMException("TestVMStateUpdate", VertexManagerWithException.VMExceptionLocation.OnVertexManagerVertexStateUpdated);
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle(new VertexEvent(vertexImplWithControlledInitializerManager.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        vertexImplWithControlledInitializerManager.getRootInputInitializerManager().completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImpl.getState());
        startVertex(vertexImplWithControlledInitializerManager, false);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertTrue(StringUtils.join(vertexImpl.getDiagnostics(), ",").contains(VertexManagerWithException.VMExceptionLocation.OnVertexManagerVertexStateUpdated.name()));
        Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertexImpl.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromII_Initialize() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize);
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithIIException();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        initVertex(vertexImplWithRunningInputInitializer);
        while (vertexImplWithRunningInputInitializer.getState() != VertexState.FAILED) {
            Thread.sleep(10L);
        }
        Assert.assertTrue(StringUtils.join(vertexImplWithRunningInputInitializer.getDiagnostics(), ",").contains(IIExceptionLocation.Initialize.name()));
        Assert.assertEquals(VertexState.FAILED, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, vertexImplWithRunningInputInitializer.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromII_InitFailedAfterInitialized() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithIIException();
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        initVertex(vertexImplWithControlledInitializerManager);
        vertexImplWithControlledInitializerManager.getRootInputInitializerManager().completeInputInitialization(0);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.INITED, vertexImplWithControlledInitializerManager.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventRootInputFailed(vertexImplWithControlledInitializerManager.getVertexId(), "input1", new AMUserCodeException(AMUserCodeException.Source.InputInitializer, new Exception("ErrorWhenInitFailureAtInited"))));
        this.dispatcher.await();
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains("ErrorWhenInitFailureAtInited"));
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, vertexImplWithControlledInitializerManager.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromII_InitFailedAfterRunning() throws Exception {
        this.useCustomInitializer = true;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithIIException();
        setupPostDagCreation();
        VertexImplWithControlledInitializerManager vertexImplWithControlledInitializerManager = (VertexImplWithControlledInitializerManager) this.vertices.get("vertex1");
        initVertex(vertexImplWithControlledInitializerManager);
        vertexImplWithControlledInitializerManager.getRootInputInitializerManager().completeInputInitialization(0);
        this.dispatcher.await();
        startVertex(vertexImplWithControlledInitializerManager);
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithControlledInitializerManager.getState());
        this.dispatcher.getEventHandler().handle(new VertexEventRootInputFailed(vertexImplWithControlledInitializerManager.getVertexId(), "input1", new AMUserCodeException(AMUserCodeException.Source.InputInitializer, new Exception("ErrorWhenInitFailureAtRunning"))));
        this.dispatcher.await();
        Assert.assertTrue(StringUtils.join(vertexImplWithControlledInitializerManager.getDiagnostics(), ",").contains("ErrorWhenInitFailureAtRunning"));
        Assert.assertEquals(VertexState.FAILED, vertexImplWithControlledInitializerManager.getState());
        Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, vertexImplWithControlledInitializerManager.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromII_HandleInputInitializerEvent() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        initVertex(vertexImplWithRunningInputInitializer);
        startVertex(vertexImplWithRunningInputInitializer);
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.INITIALIZING, vertexImplWithRunningInputInitializer2.getState());
        this.dispatcher.await();
        while (!eventHandlingRootInputInitializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        Assert.assertFalse(eventHandlingRootInputInitializer.eventReceived.get());
        Assert.assertFalse(eventHandlingRootInputInitializer.initComplete.get());
        InputInitializerEvent create = InputInitializerEvent.create("vertex2", "input1", (ByteBuffer) null);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImplWithRunningInputInitializer.getVertexId(), 0);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, 0);
        TezEvent tezEvent = new TezEvent(create, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", tezTaskAttemptID));
        this.dispatcher.getEventHandler().handle(new TaskEvent(tezTaskID, TaskEventType.T_ATTEMPT_LAUNCHED));
        this.dispatcher.getEventHandler().handle(new TaskEventTASucceeded(tezTaskAttemptID));
        this.dispatcher.getEventHandler().handle(new VertexEventRouteEvent(vertexImplWithRunningInputInitializer.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        Assert.assertTrue(StringUtils.join(vertexImplWithRunningInputInitializer2.getDiagnostics(), ",").contains(IIExceptionLocation.HandleInputInitializerEvent.name()));
        Assert.assertEquals(VertexState.FAILED, vertexImplWithRunningInputInitializer2.getState());
        Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, vertexImplWithRunningInputInitializer2.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromII_OnVertexStateUpdated() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        initVertex(vertexImplWithRunningInputInitializer);
        while (!eventHandlingRootInputInitializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        startVertex(vertexImplWithRunningInputInitializer);
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.FAILED, vertexImplWithRunningInputInitializer2.getState());
        Assert.assertTrue(StringUtils.join(vertexImplWithRunningInputInitializer2.getDiagnostics(), ",").contains(IIExceptionLocation.OnVertexStateUpdated.name()));
        Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, vertexImplWithRunningInputInitializer2.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testExceptionFromII_InitSucceededAfterInitFailure() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
        EventHandlingRootInputInitializer eventHandlingRootInputInitializer = (EventHandlingRootInputInitializer) this.customInitializer;
        setupPreDagCreation();
        this.dagPlan = createDAGPlanWithRunningInitializer();
        setupPostDagCreation();
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer vertexImplWithRunningInputInitializer2 = (VertexImplWithRunningInputInitializer) this.vertices.get("vertex2");
        initVertex(vertexImplWithRunningInputInitializer);
        while (!eventHandlingRootInputInitializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        startVertex(vertexImplWithRunningInputInitializer);
        this.dispatcher.getEventHandler().handle(new VertexEventRootInputInitialized(vertexImplWithRunningInputInitializer2.getVertexId(), "input1", (List) null));
        Assert.assertEquals(VertexState.RUNNING, vertexImplWithRunningInputInitializer.getState());
        Assert.assertEquals(VertexState.FAILED, vertexImplWithRunningInputInitializer2.getState());
        Assert.assertTrue(StringUtils.join(vertexImplWithRunningInputInitializer2.getDiagnostics(), ",").contains(IIExceptionLocation.OnVertexStateUpdated.name()));
        Assert.assertEquals(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, vertexImplWithRunningInputInitializer2.getTerminationCause());
    }

    @Test(timeout = 5000)
    public void testCompletedStatsCache() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        this.dispatcher.getEventHandler().handle(new TaskEventTALaunched(TezTaskAttemptID.getInstance(tezTaskID, 0)));
        this.dispatcher.getEventHandler().handle(new TaskEventTASucceeded(TezTaskAttemptID.getInstance(tezTaskID, 0)));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        vertexImpl.getStatistics();
        Assert.assertTrue(vertexImpl.completedTasksStatsCache.taskSet.get(0));
        this.dispatcher.getEventHandler().handle(new VertexEventTaskReschedule(tezTaskID));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertTrue(vertexImpl.completedTasksStatsCache.taskSet.cardinality() == 0);
    }

    @Test(timeout = 5000)
    public void testRouteEvent_RecoveredEvent() throws IOException {
        ((AppContext) Mockito.doReturn(this.historyEventHandler).when(this.appContext)).getHistoryHandler();
        ((AppContext) Mockito.doReturn(true).when(this.appContext)).isRecoveryEnabled();
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        VertexImpl vertexImpl2 = this.vertices.get("vertex2");
        VertexImpl vertexImpl3 = this.vertices.get("vertex3");
        startVertex(vertexImpl);
        startVertex(vertexImpl2);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        vertexImpl.handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        vertexImpl.handle(new VertexEventRouteEvent(vertexImpl.getVertexId(), Lists.newArrayList(new TezEvent[]{new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", TezTaskAttemptID.getInstance(tezTaskID, 0)))})));
        this.dispatcher.await();
        Assert.assertTrue(vertexImpl3.pendingTaskEvents.size() != 0);
        vertexImpl3.scheduleTasks(Lists.newArrayList(new VertexManagerPluginContext.ScheduleTaskRequest[]{VertexManagerPluginContext.ScheduleTaskRequest.create(0, (TaskLocationHint) null)}));
        this.dispatcher.await();
        Assert.assertTrue(vertexImpl3.pendingTaskEvents.size() == 0);
    }

    private void verifyHistoryEvents(List<DAGHistoryEvent> list, HistoryEventType historyEventType, int i) {
        int i2 = 0;
        LOG.info("");
        for (DAGHistoryEvent dAGHistoryEvent : list) {
            LOG.info(dAGHistoryEvent.getHistoryEvent().getEventType() + "");
            if (dAGHistoryEvent.getHistoryEvent().getEventType() == historyEventType) {
                i2++;
            }
        }
        Assert.assertEquals(i2, i);
    }

    @Test(timeout = 5000)
    public void testCounterLimits() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        for (int i = 0; i < 2; i++) {
            TezCounters tezCounters = new TezCounters();
            for (int i2 = 0; i2 < 75; i2++) {
                tezCounters.findCounter("g", "c" + i + "_" + i2).increment(1L);
            }
            vertexImpl.getTask(i).setCounters(tezCounters);
        }
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.RUNNING, vertexImpl.getState());
        Assert.assertEquals(1L, vertexImpl.getCompletedTasks());
        Assert.assertTrue(0.5f == vertexImpl.getCompletedTaskProgress());
        this.dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(tezTaskID2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.FAILED, vertexImpl.getState());
        Assert.assertEquals(2L, vertexImpl.getCompletedTasks());
        System.out.println(vertexImpl.getDiagnostics());
        Assert.assertTrue("Diagnostics should contain counter limits error message", StringUtils.join(vertexImpl.getDiagnostics(), ",").contains("Counters limit exceeded"));
    }

    @Test(timeout = 5000)
    public void testFirstTaskStartTime() {
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        Assert.assertEquals(vertexImpl.getFirstTaskStartTime(), -1L);
        vertexImpl.reportTaskStartTime(100L);
        Assert.assertEquals(vertexImpl.getFirstTaskStartTime(), 100L);
        vertexImpl.reportTaskStartTime(50L);
        Assert.assertEquals(vertexImpl.getFirstTaskStartTime(), 50L);
        vertexImpl.reportTaskStartTime(200L);
        Assert.assertEquals(vertexImpl.getFirstTaskStartTime(), 50L);
    }

    @Test(timeout = 5000)
    public void testLastTaskFinishTime() {
        NodeId newInstance = NodeId.newInstance("127.0.0.1", 0);
        ContainerId newContainerId = ContainerId.newContainerId(this.appAttemptId, 3L);
        Container container = (Container) Mockito.mock(Container.class);
        Mockito.when(container.getId()).thenReturn(newContainerId);
        Mockito.when(container.getNodeId()).thenReturn(newInstance);
        Mockito.when(container.getNodeHttpAddress()).thenReturn("localhost:0");
        AMContainerMap aMContainerMap = new AMContainerMap((ContainerHeartbeatHandler) Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface) Mockito.mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), this.appContext);
        aMContainerMap.addContainerIfNew(container, 0, 0, 0);
        ((AppContext) Mockito.doReturn(aMContainerMap).when(this.appContext)).getAllContainers();
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex2");
        startVertex(vertexImpl);
        TezTaskID tezTaskID = TezTaskID.getInstance(vertexImpl.getVertexId(), 0);
        TezTaskID tezTaskID2 = TezTaskID.getInstance(vertexImpl.getVertexId(), 1);
        TaskImpl task = vertexImpl.getTask(tezTaskID);
        TaskImpl task2 = vertexImpl.getTask(tezTaskID2);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(task.getTaskId(), 0);
        TezTaskAttemptID tezTaskAttemptID2 = TezTaskAttemptID.getInstance(task2.getTaskId(), 0);
        TaskAttemptImpl attempt = task.getAttempt(tezTaskAttemptID);
        TaskAttemptImpl attempt2 = task2.getAttempt(tezTaskAttemptID2);
        Assert.assertEquals(vertexImpl.getLastTaskFinishTime(), -1L);
        attempt.handle(new TaskAttemptEventSchedule(tezTaskAttemptID, 0, 0));
        attempt.handle(new TaskAttemptEventSubmitted(tezTaskAttemptID, newContainerId));
        attempt.handle(new TaskAttemptEventStartedRemotely(tezTaskAttemptID));
        attempt.handle(new TaskAttemptEvent(tezTaskAttemptID, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals(vertexImpl.getLastTaskFinishTime(), -1L);
        attempt2.handle(new TaskAttemptEventSchedule(tezTaskAttemptID2, 0, 0));
        attempt2.handle(new TaskAttemptEventSubmitted(tezTaskAttemptID2, newContainerId));
        attempt2.handle(new TaskAttemptEventStartedRemotely(tezTaskAttemptID2));
        attempt2.handle(new TaskAttemptEvent(tezTaskAttemptID2, TaskAttemptEventType.TA_DONE));
        this.dispatcher.await();
        Assert.assertEquals(VertexState.SUCCEEDED, vertexImpl.getState());
        Assert.assertTrue(vertexImpl.getLastTaskFinishTime() > 0);
    }

    @Test(timeout = 5000)
    public void testPickupDagLocalResourceOnScheduleTask() {
        initAllVertices(VertexState.INITED);
        VertexImpl vertexImpl = this.vertices.get("vertex1");
        startVertex(vertexImpl);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(vertexImpl.getTask(0).getTaskId(), 0);
        vertexImpl.getTask(0).getAttempt(tezTaskAttemptID).handle(new TaskAttemptEventSchedule(tezTaskAttemptID, 1, 1));
        this.dispatcher.await();
        Assert.assertEquals(1L, this.amSchedulerEventDispatcher.events.size());
        Map localResources = this.amSchedulerEventDispatcher.events.get(0).getContainerContext().getLocalResources();
        Assert.assertTrue(localResources.containsKey("dag lr"));
        Assert.assertTrue(localResources.containsKey("vertex lr"));
    }

    static {
        Limits.reset();
        Configuration configuration = new Configuration(false);
        configuration.setInt("tez.counters.max", 100);
        configuration.setInt("tez.counters.max.groups", 100);
        Limits.setConfiguration(configuration);
    }
}
