package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManagerUtils;
import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManagerBase.class */
public class TestShuffleVertexManagerBase extends TestShuffleVertexManagerUtils {
    List<TaskAttemptIdentifier> emptyCompletions = null;
    Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass;

    @Parameterized.Parameters(name = "test[{0}]")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{ShuffleVertexManager.class}, new Object[]{FairShuffleVertexManager.class});
    }

    public TestShuffleVertexManagerBase(Class<? extends ShuffleVertexManagerBase> cls) {
        this.shuffleVertexManagerClass = cls;
    }

    @Test(timeout = 5000)
    public void testZeroSourceTasksWithVertexStartedFirst() {
        Configuration configuration = new Configuration();
        LinkedList newLinkedList = Lists.newLinkedList();
        VertexManagerPluginContext createVertexManagerContext = createVertexManagerContext("Vertex1", 0, "Vertex2", 0, "Vertex3", 1, "Vertex4", 4, newLinkedList, null);
        ShuffleVertexManagerBase createManager = createManager(configuration, createVertexManagerContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        createManager.onVertexStarted(this.emptyCompletions);
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).vertexReconfigurationPlanned();
        Assert.assertTrue(createManager.bipartiteSources == 2);
        Assert.assertFalse(createManager.pendingTasks.isEmpty());
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager.pendingTasks.isEmpty());
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.eq(1), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).doneReconfiguringVertex();
        Assert.assertTrue(newLinkedList.size() == 1);
        newLinkedList.clear();
    }

    @Test(timeout = 5000)
    public void testZeroSourceTasksWithVertexStateUpdatedFirst() {
        Configuration configuration = new Configuration();
        LinkedList newLinkedList = Lists.newLinkedList();
        VertexManagerPluginContext createVertexManagerContext = createVertexManagerContext("Vertex1", 0, "Vertex2", 0, "Vertex3", 1, "Vertex4", 4, newLinkedList, null);
        ShuffleVertexManagerBase createManager = createManager(configuration, createVertexManagerContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).vertexReconfigurationPlanned();
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(0))).doneReconfiguringVertex();
        Assert.assertTrue(newLinkedList.size() == 0);
        createManager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue(createManager.bipartiteSources == 2);
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.eq(1), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).doneReconfiguringVertex();
        Assert.assertTrue(createManager.pendingTasks.isEmpty());
        Assert.assertTrue(newLinkedList.size() == 1);
    }

    @Test(timeout = 5000)
    public void testVMEventFirst() throws IOException {
        Configuration configuration = new Configuration();
        VertexManagerPluginContext createVertexManagerContext = createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, Lists.newLinkedList(), null);
        VertexManagerEvent vertexManagerEvent = getVertexManagerEvent(null, 1L, "Vertex");
        ShuffleVertexManagerBase createManager = createManager(configuration, createVertexManagerContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        vertexManagerEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "Vertex1", TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0")));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager.onVertexManagerEventReceived(vertexManagerEvent);
        Assert.assertEquals(0L, createManager.numVertexManagerEventsReceived);
        createManager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals(1L, createManager.numVertexManagerEventsReceived);
    }

    @Test(timeout = 5000)
    public void testPartitionStats() throws IOException {
        Configuration configuration = new Configuration();
        VertexManagerPluginContext createVertexManagerContext = createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, Lists.newLinkedList(), null);
        long[] jArr = {0, 1048576, 1010827264, 50331648};
        VertexManagerEvent vertexManagerEvent = getVertexManagerEvent(jArr, 0L, "Vertex", false);
        ShuffleVertexManagerBase createManager = createManager(configuration, createVertexManagerContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        createManager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        vertexManagerEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "Vertex1", TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0")));
        createManager.onVertexManagerEventReceived(vertexManagerEvent);
        Assert.assertEquals(1L, createManager.numVertexManagerEventsReceived);
        Assert.assertEquals(0L, createManager.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals(1L, createManager.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals(100L, createManager.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals(10L, createManager.getCurrentlyKnownStatsAtIndex(3));
        vertexManagerEvent.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "Vertex1", TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1")));
        createManager.onVertexManagerEventReceived(vertexManagerEvent);
        Assert.assertEquals(1L, createManager.numVertexManagerEventsReceived);
        Assert.assertEquals(0L, createManager.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals(1L, createManager.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals(100L, createManager.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals(10L, createManager.getCurrentlyKnownStatsAtIndex(3));
        VertexManagerEvent vertexManagerEvent2 = getVertexManagerEvent(jArr, 0L, "Vertex", true);
        ShuffleVertexManagerBase createManager2 = createManager(configuration, createVertexManagerContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        createManager2.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals(4L, createManager2.pendingTasks.size());
        Assert.assertEquals(0L, createManager2.numBipartiteSourceTasksCompleted);
        vertexManagerEvent2.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "Vertex1", TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_0")));
        createManager2.onVertexManagerEventReceived(vertexManagerEvent2);
        Assert.assertEquals(1L, createManager2.numVertexManagerEventsReceived);
        Assert.assertEquals(0L, createManager2.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals(1L, createManager2.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals(964L, createManager2.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals(48L, createManager2.getCurrentlyKnownStatsAtIndex(3));
        vertexManagerEvent2.setProducerAttemptIdentifier(new TaskAttemptIdentifierImpl("dag", "Vertex1", TezTaskAttemptID.fromString("attempt_1436907267600_195589_1_00_000000_1")));
        createManager2.onVertexManagerEventReceived(vertexManagerEvent2);
        Assert.assertEquals(1L, createManager2.numVertexManagerEventsReceived);
        Assert.assertEquals(0L, createManager2.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals(1L, createManager2.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals(964L, createManager2.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals(48L, createManager2.getCurrentlyKnownStatsAtIndex(3));
    }

    @Test(timeout = 5000)
    public void testTez978() throws IOException {
        Configuration configuration = new Configuration();
        LinkedList newLinkedList = Lists.newLinkedList();
        VertexManagerPluginContext createVertexManagerContext = createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, newLinkedList, null);
        ShuffleVertexManagerBase createManager = createManager(configuration, createVertexManagerContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        createManager.onVertexStarted(this.emptyCompletions);
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(4L, createManager.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        createManager.onVertexManagerEventReceived(getVertexManagerEvent(null, 1L, "Vertex1"));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertTrue(!createManager.determineParallelismAndApply(0.0f));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        Assert.assertEquals(1L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals(1L, createManager.numVertexManagerEventsReceived);
        Assert.assertEquals(1L, createManager.completedSourceTasksOutputSize);
        createManager.onVertexManagerEventReceived(getVertexManagerEvent(null, 1L, "Vertex2"));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue(!createManager.determineParallelismAndApply(0.25f));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        Assert.assertEquals(2L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals(2L, createManager.numVertexManagerEventsReceived);
        Assert.assertEquals(2L, createManager.completedSourceTasksOutputSize);
        createManager.onVertexManagerEventReceived(getVertexManagerEvent(null, 160 * MB, "Vertex2"));
        Assert.assertTrue(createManager.determineParallelismAndApply(0.25f));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.eq(2), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(0L, createManager.pendingTasks.size());
        Assert.assertEquals(2L, newLinkedList.size());
        Assert.assertEquals(2L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals(3L, createManager.numVertexManagerEventsReceived);
        Assert.assertEquals((160 * MB) + 2, createManager.completedSourceTasksOutputSize);
        Mockito.when(Integer.valueOf(createVertexManagerContext.getVertexNumTasks("Vertex1"))).thenReturn(20);
        Mockito.when(Integer.valueOf(createVertexManagerContext.getVertexNumTasks("Vertex2"))).thenReturn(20);
        Mockito.when(Integer.valueOf(createVertexManagerContext.getVertexNumTasks("Vertex4"))).thenReturn(40);
        newLinkedList.clear();
        ShuffleVertexManagerBase createManager2 = createManager(configuration, createVertexManagerContext, Float.valueOf(0.0f), Float.valueOf(0.2f));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        createManager2.onVertexStarted(this.emptyCompletions);
        createManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals(40L, createManager2.pendingTasks.size());
        Assert.assertEquals(40L, createManager2.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager2.numBipartiteSourceTasksCompleted);
        for (int i = 0; i < 8; i++) {
            createManager2.onVertexManagerEventReceived(getVertexManagerEvent(null, 10 * MB, "Vertex1"));
            createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", i));
            ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        }
        for (int i2 = 0; i2 < 3; i2++) {
            createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", i2));
            ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        }
        createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 8));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(2))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(2))).reconfigureVertex(Mockito.eq(2), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
    }

    @Test(timeout = 5000)
    public void testAutoParallelism() throws Exception {
        Configuration configuration = new Configuration();
        LinkedList newLinkedList = Lists.newLinkedList();
        HashMap hashMap = new HashMap();
        VertexManagerPluginContext createVertexManagerContext = createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, newLinkedList, hashMap);
        ShuffleVertexManagerBase createManager = createManager(configuration, createVertexManagerContext, Float.valueOf(0.5f), Float.valueOf(0.5f));
        createManager.onVertexStarted(this.emptyCompletions);
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(4L, createManager.totalNumBipartiteSourceTasks);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex3", 0));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(4L, createManager.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        createManager.onVertexManagerEventReceived(getVertexManagerEvent(null, 50 * MB, "Vertex1"));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        Assert.assertEquals(1L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals(1L, createManager.numVertexManagerEventsReceived);
        Assert.assertEquals(50 * MB, createManager.completedSourceTasksOutputSize);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals(4L, createManager.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        Assert.assertEquals(1L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals(50 * MB, createManager.completedSourceTasksOutputSize);
        createManager.onVertexManagerEventReceived(getVertexManagerEvent(null, 50 * MB, "Vertex2"));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.eq(2), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals(2L, hashMap.size());
        Assert.assertEquals(0L, createManager.pendingTasks.size());
        Assert.assertEquals(2L, newLinkedList.size());
        Assert.assertTrue(newLinkedList.contains(new Integer(0)));
        Assert.assertTrue(newLinkedList.contains(new Integer(1)));
        Assert.assertEquals(2L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals(2L, createManager.numVertexManagerEventsReceived);
        Assert.assertEquals(100 * MB, createManager.completedSourceTasksOutputSize);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals(2L, hashMap.size());
        EdgeManagerPluginOnDemand next = hashMap.values().iterator().next();
        Assert.assertEquals(4L, next.getNumSourceTaskPhysicalOutputs(0));
        Assert.assertEquals(4L, next.getNumDestinationTaskPhysicalInputs(0));
        EdgeManagerPluginOnDemand.EventRouteMetadata routeDataMovementEventToDestination = next.routeDataMovementEventToDestination(1, 1, 0);
        Assert.assertEquals(1L, routeDataMovementEventToDestination.getNumEvents());
        Assert.assertEquals(3L, routeDataMovementEventToDestination.getTargetIndices()[0]);
        EdgeManagerPluginOnDemand.EventRouteMetadata routeDataMovementEventToDestination2 = next.routeDataMovementEventToDestination(0, 2, 1);
        Assert.assertEquals(1L, routeDataMovementEventToDestination2.getNumEvents());
        Assert.assertEquals(0L, routeDataMovementEventToDestination2.getTargetIndices()[0]);
        EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination = next.routeInputSourceTaskFailedEventToDestination(1, 0);
        Assert.assertEquals(2L, routeInputSourceTaskFailedEventToDestination.getNumEvents());
        Assert.assertEquals(2L, routeInputSourceTaskFailedEventToDestination.getTargetIndices()[0]);
        Assert.assertEquals(3L, routeInputSourceTaskFailedEventToDestination.getTargetIndices()[1]);
        EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination2 = next.routeInputSourceTaskFailedEventToDestination(1, 1);
        Assert.assertEquals(2L, routeInputSourceTaskFailedEventToDestination2.getNumEvents());
        Assert.assertEquals(2L, routeInputSourceTaskFailedEventToDestination2.getTargetIndices()[0]);
        Assert.assertEquals(3L, routeInputSourceTaskFailedEventToDestination2.getTargetIndices()[1]);
    }

    @Test(timeout = 5000)
    public void testShuffleVertexManagerSlowStart() {
        Configuration configuration = new Configuration();
        HashMap hashMap = new HashMap();
        EdgeProperty create = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create2 = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create3 = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when(vertexManagerPluginContext.getVertexStatistics((String) Mockito.any(String.class))).thenReturn((VertexStatistics) Mockito.mock(VertexStatistics.class));
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("Vertex4");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex4"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex3"))).thenReturn(1);
        hashMap.put("Vertex3", create3);
        try {
            createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.1f), Float.valueOf(0.1f)).onVertexStarted(this.emptyCompletions);
            Assert.assertFalse(true);
        } catch (TezUncheckedException e) {
            Assert.assertTrue(e.getMessage().contains("At least 1 bipartite source should exist"));
        }
        hashMap.put("Vertex1", create);
        hashMap.put("Vertex2", create2);
        ShuffleVertexManagerBase createManager = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        createManager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue(createManager.bipartiteSources == 2);
        LinkedList newLinkedList = Lists.newLinkedList();
        ((VertexManagerPluginContext) Mockito.doAnswer(new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(newLinkedList)).when(vertexManagerPluginContext)).scheduleTasks(Mockito.anyList());
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(0);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(0);
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        createManager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue(createManager.pendingTasks.isEmpty());
        Assert.assertTrue(newLinkedList.size() == 3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(2);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(2);
        try {
            createManager(configuration, vertexManagerPluginContext, Float.valueOf(-0.1f), Float.valueOf(0.0f));
            Assert.assertTrue(false);
        } catch (IllegalArgumentException e2) {
            Assert.assertTrue(e2.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.0f), Float.valueOf(95.0f));
            Assert.assertTrue(false);
        } catch (IllegalArgumentException e3) {
            Assert.assertTrue(e3.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.5f), Float.valueOf(0.3f));
            Assert.assertTrue(false);
        } catch (IllegalArgumentException e4) {
            Assert.assertTrue(e4.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(20);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(20);
        newLinkedList.clear();
        ShuffleVertexManagerBase createManager2 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.8f), null);
        createManager2.onVertexStarted(this.emptyCompletions);
        createManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals(3L, createManager2.pendingTasks.size());
        Assert.assertEquals(20 * 2, createManager2.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager2.numBipartiteSourceTasksCompleted);
        float f = 0.8f * 20;
        for (String str : new String[]{"Vertex1", "Vertex2"}) {
            for (int i = 0; i < vertexManagerPluginContext.getVertexNumTasks(str); i++) {
                createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier(str, i + 1));
                if (i + 2 >= f) {
                    break;
                }
            }
        }
        Assert.assertEquals(createManager2.totalTasksToSchedule, createManager2.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals(3L, createManager2.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(0L, createManager2.pendingTasks.size());
        Assert.assertEquals(createManager2.totalTasksToSchedule, newLinkedList.size());
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(2);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(2);
        ShuffleVertexManagerBase createManager3 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.0f), Float.valueOf(0.0f));
        createManager3.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue(createManager3.totalTasksToSchedule == 3);
        Assert.assertTrue(createManager3.numBipartiteSourceTasksCompleted == 0);
        createManager3.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager3.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager3.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager3.totalNumBipartiteSourceTasks == 4);
        Assert.assertTrue(createManager3.pendingTasks.isEmpty());
        Assert.assertTrue(newLinkedList.size() == 3);
        ShuffleVertexManagerBase createManager4 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(0.25f));
        createManager4.onVertexStarted(this.emptyCompletions);
        createManager4.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager4.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager4.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager4.pendingTasks.size() == 3);
        Assert.assertTrue(createManager4.totalNumBipartiteSourceTasks == 4);
        createManager4.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex3", 0));
        Assert.assertTrue(createManager4.pendingTasks.size() == 3);
        Assert.assertTrue(createManager4.totalNumBipartiteSourceTasks == 4);
        Assert.assertTrue(createManager4.numBipartiteSourceTasksCompleted == 0);
        createManager4.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue(createManager4.pendingTasks.size() == 3);
        Assert.assertTrue(createManager4.totalNumBipartiteSourceTasks == 4);
        Assert.assertTrue(createManager4.numBipartiteSourceTasksCompleted == 1);
        createManager4.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertTrue(createManager4.pendingTasks.isEmpty());
        Assert.assertTrue(newLinkedList.size() == 3);
        Assert.assertTrue(createManager4.numBipartiteSourceTasksCompleted == 2);
        ShuffleVertexManagerBase createManager5 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        createManager5.onVertexStarted(this.emptyCompletions);
        createManager5.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager5.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager5.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager5.pendingTasks.size() == 3);
        Assert.assertTrue(createManager5.totalNumBipartiteSourceTasks == 4);
        createManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex3", 0));
        Assert.assertTrue(createManager5.pendingTasks.size() == 3);
        Assert.assertTrue(createManager5.totalNumBipartiteSourceTasks == 4);
        Assert.assertTrue(createManager5.numBipartiteSourceTasksCompleted == 0);
        createManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertTrue(createManager5.pendingTasks.size() == 3);
        Assert.assertTrue(createManager5.numBipartiteSourceTasksCompleted == 1);
        createManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        Assert.assertTrue(createManager5.pendingTasks.size() == 3);
        Assert.assertTrue(createManager5.numBipartiteSourceTasksCompleted == 2);
        createManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue(createManager5.pendingTasks.size() == 3);
        Assert.assertTrue(createManager5.numBipartiteSourceTasksCompleted == 3);
        createManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertTrue(createManager5.pendingTasks.isEmpty());
        Assert.assertTrue(newLinkedList.size() == 3);
        Assert.assertTrue(createManager5.numBipartiteSourceTasksCompleted == 4);
        ShuffleVertexManagerBase createManager6 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        createManager6.onVertexStarted(this.emptyCompletions);
        createManager6.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager6.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager6.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager6.pendingTasks.size() == 3);
        Assert.assertTrue(createManager6.totalNumBipartiteSourceTasks == 4);
        createManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex3", 0));
        Assert.assertTrue(createManager6.pendingTasks.size() == 3);
        Assert.assertTrue(createManager6.totalNumBipartiteSourceTasks == 4);
        Assert.assertTrue(createManager6.numBipartiteSourceTasksCompleted == 0);
        createManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertTrue(createManager6.pendingTasks.size() == 3);
        Assert.assertTrue(createManager6.numBipartiteSourceTasksCompleted == 1);
        createManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        Assert.assertTrue(createManager6.pendingTasks.size() == 3);
        Assert.assertTrue(createManager6.numBipartiteSourceTasksCompleted == 2);
        createManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue(createManager6.pendingTasks.size() == 3);
        Assert.assertTrue(createManager6.numBipartiteSourceTasksCompleted == 3);
        createManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertTrue(createManager6.pendingTasks.isEmpty());
        Assert.assertTrue(newLinkedList.size() == 3);
        Assert.assertTrue(createManager6.numBipartiteSourceTasksCompleted == 4);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(4);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(4);
        ShuffleVertexManagerBase createManager7 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        createManager7.onVertexStarted(this.emptyCompletions);
        createManager7.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager7.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager7.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager7.pendingTasks.size() == 3);
        Assert.assertTrue(createManager7.totalNumBipartiteSourceTasks == 8);
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertTrue(createManager7.pendingTasks.size() == 3);
        Assert.assertTrue(createManager7.numBipartiteSourceTasksCompleted == 2);
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertTrue(createManager7.pendingTasks.size() == 3);
        Assert.assertTrue(createManager7.numBipartiteSourceTasksCompleted == 2);
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue(createManager7.pendingTasks.size() == 1);
        Assert.assertTrue(newLinkedList.size() == 2);
        Assert.assertTrue(createManager7.numBipartiteSourceTasksCompleted == 4);
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 2));
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 2));
        Assert.assertTrue(createManager7.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 1);
        Assert.assertTrue(createManager7.numBipartiteSourceTasksCompleted == 6);
        newLinkedList.clear();
        createManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 3));
        Assert.assertTrue(createManager7.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 0);
        Assert.assertTrue(createManager7.numBipartiteSourceTasksCompleted == 7);
        ShuffleVertexManagerBase createManager8 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(1.0f));
        createManager8.onVertexStarted(this.emptyCompletions);
        createManager8.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager8.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager8.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager8.pendingTasks.size() == 3);
        Assert.assertTrue(createManager8.totalNumBipartiteSourceTasks == 8);
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue(createManager8.pendingTasks.size() == 2);
        Assert.assertTrue(newLinkedList.size() == 1);
        Assert.assertTrue(createManager8.numBipartiteSourceTasksCompleted == 4);
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 2));
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 2));
        Assert.assertTrue(createManager8.pendingTasks.size() == 1);
        Assert.assertTrue(newLinkedList.size() == 1);
        Assert.assertTrue(createManager8.numBipartiteSourceTasksCompleted == 6);
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 3));
        createManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 3));
        Assert.assertTrue(createManager8.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 1);
        Assert.assertTrue(createManager8.numBipartiteSourceTasksCompleted == 8);
        newLinkedList.clear();
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex4"))).thenReturn(1);
        ShuffleVertexManagerBase createManager9 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        createManager9.onVertexStarted(this.emptyCompletions);
        createManager9.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createManager9.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        createManager9.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager9.pendingTasks.size() == 1);
        Assert.assertTrue(createManager9.totalNumBipartiteSourceTasks == 8);
        createManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        createManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertTrue(createManager9.pendingTasks.size() == 1);
        Assert.assertTrue(newLinkedList.size() == 0);
        Assert.assertTrue(createManager9.numBipartiteSourceTasksCompleted == 2);
        createManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        createManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue(createManager9.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 1);
        Assert.assertTrue(createManager9.numBipartiteSourceTasksCompleted == 4);
        newLinkedList.clear();
        createManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 2));
        createManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 2));
        Assert.assertTrue(createManager9.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 0);
        Assert.assertTrue(createManager9.numBipartiteSourceTasksCompleted == 6);
        newLinkedList.clear();
        createManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 3));
        Assert.assertTrue(createManager9.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 0);
        Assert.assertTrue(createManager9.numBipartiteSourceTasksCompleted == 7);
    }

    @Test(timeout = 5000)
    public void test_Tez1649_with_scatter_gather_edges() throws IOException {
        Configuration configuration = new Configuration();
        HashMap hashMap = new HashMap();
        EdgeProperty create = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create2 = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create3 = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        hashMap.put("R1", create);
        hashMap.put("M2", create2);
        hashMap.put("M3", create3);
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R1"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M3"))).thenReturn(3);
        VertexManagerEvent vertexManagerEvent = getVertexManagerEvent(null, 50L, "R1");
        ShuffleVertexManagerBase createManager = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        LinkedList newLinkedList = Lists.newLinkedList();
        ((VertexManagerPluginContext) Mockito.doAnswer(new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(newLinkedList)).when(vertexManagerPluginContext)).scheduleTasks(Mockito.anyList());
        createManager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue(createManager.bipartiteSources == 3);
        createManager.onVertexStateUpdated(new VertexStateUpdate("M2", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("M3", VertexState.CONFIGURED));
        createManager.onVertexManagerEventReceived(vertexManagerEvent);
        Assert.assertEquals(3L, createManager.pendingTasks.size());
        Assert.assertEquals(6L, createManager.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertTrue(createManager.pendingTasks.size() == 3);
        Assert.assertTrue(createManager.totalNumBipartiteSourceTasks == 6);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M3", 0));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M3", 1));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M3", 2));
        Assert.assertTrue(createManager.pendingTasks.size() == 3);
        Assert.assertTrue(createManager.totalNumBipartiteSourceTasks == 6);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M2", 0));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M2", 1));
        Assert.assertTrue(createManager.pendingTasks.size() == 3);
        Assert.assertTrue(createManager.totalNumBipartiteSourceTasks == 6);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 0));
        Assert.assertTrue(createManager.pendingTasks.size() == 3);
        Assert.assertTrue(createManager.totalNumBipartiteSourceTasks == 6);
        ((VertexManagerPluginContext) Mockito.verify(vertexManagerPluginContext, Mockito.times(0))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(1);
        createManager.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        Assert.assertTrue(createManager.totalNumBipartiteSourceTasks == 9);
        ((VertexManagerPluginContext) Mockito.verify(vertexManagerPluginContext, Mockito.times(1))).reconfigureVertex(Mockito.eq(1), (VertexLocationHint) Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertTrue(createManager.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 1);
        newLinkedList.clear();
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R1"))).thenReturn(0);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M2"))).thenReturn(0);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M3"))).thenReturn(3);
        ShuffleVertexManagerBase createManager2 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        createManager2.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals(3L, createManager2.pendingTasks.size());
        Assert.assertEquals(0L, createManager2.numBipartiteSourceTasksCompleted);
        createManager2.onVertexStateUpdated(new VertexStateUpdate("M3", VertexState.CONFIGURED));
        createManager2.onVertexStateUpdated(new VertexStateUpdate("M2", VertexState.CONFIGURED));
        createManager2.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        Assert.assertEquals(3L, createManager2.totalNumBipartiteSourceTasks);
        createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("M3", 0));
        Assert.assertTrue(createManager2.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 3);
    }

    @Test(timeout = 5000)
    public void test_Tez1649_with_mixed_edges() {
        Configuration configuration = new Configuration();
        HashMap hashMap = new HashMap();
        EdgeProperty create = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create2 = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create3 = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        hashMap.put("R1", create);
        hashMap.put("M2", create2);
        hashMap.put("M3", create3);
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R1"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M3"))).thenReturn(3);
        LinkedList newLinkedList = Lists.newLinkedList();
        ((VertexManagerPluginContext) Mockito.doAnswer(new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(newLinkedList)).when(vertexManagerPluginContext)).scheduleTasks(Mockito.anyList());
        ShuffleVertexManagerBase createManager = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        createManager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue(createManager.bipartiteSources == 1);
        createManager.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        createManager.onVertexStateUpdated(new VertexStateUpdate("M2", VertexState.CONFIGURED));
        Assert.assertEquals(3L, createManager.pendingTasks.size());
        Assert.assertEquals(3L, createManager.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 0));
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 1));
        Assert.assertTrue(createManager.pendingTasks.size() == 3);
        Assert.assertTrue(createManager.totalNumBipartiteSourceTasks == 3);
        createManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M2", 0));
        Assert.assertTrue(createManager.pendingTasks.size() == 3);
        Assert.assertTrue(createManager.totalNumBipartiteSourceTasks == 3);
        createManager.onVertexStateUpdated(new VertexStateUpdate("M3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 3);
        newLinkedList.clear();
        ShuffleVertexManagerBase createManager2 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        createManager2.onVertexStarted(this.emptyCompletions);
        createManager2.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R1"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M3"))).thenReturn(3);
        Assert.assertTrue(createManager2.pendingTasks.size() == 3);
        Assert.assertTrue(createManager2.totalNumBipartiteSourceTasks == 3);
        createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 0));
        createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 1));
        createManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 2));
        Assert.assertTrue(createManager2.pendingTasks.size() == 3);
        createManager2.onVertexStateUpdated(new VertexStateUpdate("M2", VertexState.CONFIGURED));
        createManager2.onVertexStateUpdated(new VertexStateUpdate("M3", VertexState.CONFIGURED));
        Assert.assertTrue(createManager2.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 3);
        newLinkedList.clear();
        createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f)).onVertexStarted(this.emptyCompletions);
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R1"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M2"))).thenReturn(0);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M3"))).thenReturn(3);
        ShuffleVertexManagerBase createManager3 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        createManager3.onVertexStarted(this.emptyCompletions);
        createManager3.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        Assert.assertEquals(3L, createManager3.pendingTasks.size());
        Assert.assertEquals(3L, createManager3.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager3.numBipartiteSourceTasksCompleted);
        createManager3.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 0));
        createManager3.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 1));
        Assert.assertTrue(createManager3.pendingTasks.size() == 3);
        Assert.assertTrue(newLinkedList.size() == 0);
        createManager3.onVertexStateUpdated(new VertexStateUpdate("M3", VertexState.CONFIGURED));
        createManager3.onVertexStateUpdated(new VertexStateUpdate("M2", VertexState.CONFIGURED));
        Assert.assertTrue(createManager3.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 3);
        newLinkedList.clear();
        ShuffleVertexManagerBase createManager4 = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        createManager4.onVertexStarted(this.emptyCompletions);
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R1"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M2"))).thenReturn(0);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M3"))).thenReturn(0);
        createManager4.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        createManager4.onVertexStateUpdated(new VertexStateUpdate("M3", VertexState.CONFIGURED));
        createManager4.onVertexStateUpdated(new VertexStateUpdate("M2", VertexState.CONFIGURED));
        createManager4.onSourceTaskCompleted(createTaskAttemptIdentifier("R1", 0));
        Assert.assertTrue(createManager4.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == 3);
    }

    @Test
    public void testZeroTasksSendsConfigured() throws IOException {
        Configuration configuration = new Configuration();
        HashMap hashMap = new HashMap();
        hashMap.put("R1", EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in")));
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(0);
        ShuffleVertexManagerBase createManager = createManager(configuration, vertexManagerPluginContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        ((VertexManagerPluginContext) Mockito.doAnswer(new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(Lists.newLinkedList())).when(vertexManagerPluginContext)).scheduleTasks(Mockito.anyList());
        createManager.onVertexStarted(this.emptyCompletions);
        createManager.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        Assert.assertEquals(1L, createManager.bipartiteSources);
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals(0L, createManager.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createManager.pendingTasks.size());
        Assert.assertEquals(0L, r0.size());
        ((VertexManagerPluginContext) Mockito.verify(vertexManagerPluginContext)).doneReconfiguringVertex();
    }

    @Test(timeout = 5000)
    public void testTezDrainCompletionsOnVertexStart() throws IOException {
        ShuffleVertexManagerBase createManager = createManager(new Configuration(), createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, Lists.newLinkedList(), null), Float.valueOf(0.01f), Float.valueOf(0.75f));
        Assert.assertEquals(0L, createManager.numBipartiteSourceTasksCompleted);
        createManager.onVertexStarted(Collections.singletonList(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex1", 0)));
        Assert.assertEquals(1L, createManager.numBipartiteSourceTasksCompleted);
    }

    private ShuffleVertexManagerBase createManager(Configuration configuration, VertexManagerPluginContext vertexManagerPluginContext, Float f, Float f2) {
        return createManager(this.shuffleVertexManagerClass, configuration, vertexManagerPluginContext, true, null, f, f2);
    }
}
