/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestShuffleVertexManager {
    @Test(timeout=5000L)
    public void testShuffleVertexManagerAutoParallelism() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId3 = "Vertex3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "Vertex4";
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        mockInputVertices.put(mockSrcVertexId3, eProp3);
        final VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"Vertex4");
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        ShuffleVertexManager.ShuffleVertexManagerConfigBuilder configurer = ShuffleVertexManager.createConfigBuilder(null);
        VertexManagerPluginDescriptor pluginDesc = configurer.setAutoReduceParallelism(true).setDesiredTaskInputSize(1000L).setMinTaskParallelism(10).setSlowStartMaxSrcCompletionFraction(0.5f).build();
        Mockito.when((Object)mockContext.getUserPayload()).thenReturn((Object)pluginDesc.getUserPayload());
        manager = (ShuffleVertexManager)ReflectionUtils.createClazzInstance((String)pluginDesc.getClassName(), (Class[])new Class[]{VertexManagerPluginContext.class}, (Object[])new Object[]{mockContext});
        manager.initialize();
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((manager.enableAutoParallelism ? 1 : 0) != 0);
        Assert.assertTrue((manager.desiredTaskInputDataSize == 1000L ? 1 : 0) != 0);
        Assert.assertTrue((manager.minTaskParallelism == 10 ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMinSrcCompletionFraction == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMaxSrcCompletionFraction == 0.5f ? 1 : 0) != 0);
        configurer = ShuffleVertexManager.createConfigBuilder(null);
        pluginDesc = configurer.setAutoReduceParallelism(false).build();
        Mockito.when((Object)mockContext.getUserPayload()).thenReturn((Object)pluginDesc.getUserPayload());
        manager = (ShuffleVertexManager)ReflectionUtils.createClazzInstance((String)pluginDesc.getClassName(), (Class[])new Class[]{VertexManagerPluginContext.class}, (Object[])new Object[]{mockContext});
        manager.initialize();
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((!manager.enableAutoParallelism ? 1 : 0) != 0);
        Assert.assertTrue((manager.desiredTaskInputDataSize == 0x6400000L ? 1 : 0) != 0);
        Assert.assertTrue((manager.minTaskParallelism == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMinSrcCompletionFraction == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMaxSrcCompletionFraction == 0.75f ? 1 : 0) != 0);
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.TaskWithLocationHint task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext)).scheduleVertexTasks(Mockito.anyList());
        final HashMap newEdgeManagers = new HashMap();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)2);
                newEdgeManagers.clear();
                for (Map.Entry entry : ((Map)invocation.getArguments()[2]).entrySet()) {
                    final UserPayload userPayload = ((EdgeManagerPluginDescriptor)entry.getValue()).getUserPayload();
                    EdgeManagerPluginContext emContext = new EdgeManagerPluginContext(){

                        public UserPayload getUserPayload() {
                            return userPayload == null ? null : userPayload;
                        }

                        public String getSourceVertexName() {
                            return null;
                        }

                        public String getDestinationVertexName() {
                            return null;
                        }

                        public int getSourceVertexNumTasks() {
                            return 2;
                        }

                        public int getDestinationVertexNumTasks() {
                            return 2;
                        }
                    };
                    EdgeManagerPlugin edgeManager = (EdgeManagerPlugin)ReflectionUtils.createClazzInstance((String)((EdgeManagerPluginDescriptor)entry.getValue()).getClassName(), (Class[])new Class[]{EdgeManagerPluginContext.class}, (Object[])new Object[]{emContext});
                    edgeManager.initialize();
                    newEdgeManagers.put(entry.getKey(), edgeManager);
                }
                return null;
            }
        }).when((Object)mockContext)).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).vertexReconfigurationPlanned();
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn((Object)1);
        manager.onVertexStarted(null);
        Assert.assertFalse((boolean)manager.pendingTasks.isEmpty());
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).doneReconfiguringVertex();
        Assert.assertTrue((scheduledTasks.size() == 4 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).vertexReconfigurationPlanned();
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).doneReconfiguringVertex();
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        manager.onVertexStarted(null);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).doneReconfiguringVertex();
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 4 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        ByteBuffer payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer();
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)"Vertex", (ByteBuffer)payload);
        manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)4))).vertexReconfigurationPlanned();
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.pendingTasks.size() == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).setVertexParallelism(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).doneReconfiguringVertex();
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).setVertexParallelism(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).doneReconfiguringVertex();
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)5000L, (long)manager.completedSourceTasksOutputSize);
        scheduledTasks.clear();
        payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer();
        vmEvent = VertexManagerEvent.create((String)"Vertex", (ByteBuffer)payload);
        manager = this.createManager(conf, mockContext, 0.01f, 0.75f);
        manager.onVertexStarted(null);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((!manager.determineParallelismAndApply() ? 1 : 0) != 0);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1L, (long)manager.completedSourceTasksOutputSize);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((!manager.determineParallelismAndApply() ? 1 : 0) != 0);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)2L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)2L, (long)manager.completedSourceTasksOutputSize);
        payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString().asReadOnlyByteBuffer();
        vmEvent = VertexManagerEvent.create((String)"Vertex", (ByteBuffer)payload);
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertTrue((boolean)manager.determineParallelismAndApply());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertEquals((long)1L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)1L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)3L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1202L, (long)manager.completedSourceTasksOutputSize);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)20);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)20);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)40);
        scheduledTasks.clear();
        payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString().asReadOnlyByteBuffer();
        vmEvent = VertexManagerEvent.create((String)"Vertex", (ByteBuffer)payload);
        manager = this.createManager(conf, mockContext, 0.0f, 0.2f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertEquals((long)40L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)40L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        for (int i = 0; i < 7; ++i) {
            manager.onVertexManagerEventReceived(vmEvent);
            manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i));
            ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).setVertexParallelism(Mockito.eq((int)4), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        }
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).setVertexParallelism(Mockito.eq((int)4), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        scheduledTasks.clear();
        payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer();
        vmEvent = VertexManagerEvent.create((String)"Vertex", (ByteBuffer)payload);
        manager = this.createManager(conf, mockContext, 0.5f, 0.5f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)500L, (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)500L, (long)manager.completedSourceTasksOutputSize);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)2L, (long)scheduledTasks.size());
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(0)));
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(1)));
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)2L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1000L, (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        EdgeManagerPlugin edgeManager = (EdgeManagerPlugin)newEdgeManagers.values().iterator().next();
        HashMap targets = Maps.newHashMap();
        DataMovementEvent dmEvent = DataMovementEvent.create((int)1, (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        Assert.assertEquals((long)4L, (long)edgeManager.getNumSourceTaskPhysicalOutputs(0));
        Assert.assertEquals((long)4L, (long)edgeManager.getNumDestinationTaskPhysicalInputs(0));
        edgeManager.routeDataMovementEventToDestination(dmEvent, 1, dmEvent.getSourceIndex(), (Map)targets);
        Assert.assertEquals((long)1L, (long)targets.size());
        Map.Entry e = targets.entrySet().iterator().next();
        Assert.assertEquals((long)0L, (long)((Integer)e.getKey()).intValue());
        Assert.assertEquals((long)1L, (long)((List)e.getValue()).size());
        Assert.assertEquals((long)3L, (long)((Integer)((List)e.getValue()).get(0)).intValue());
        targets.clear();
        dmEvent = DataMovementEvent.create((int)2, (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        edgeManager.routeDataMovementEventToDestination(dmEvent, 0, dmEvent.getSourceIndex(), (Map)targets);
        Assert.assertEquals((long)1L, (long)targets.size());
        e = targets.entrySet().iterator().next();
        Assert.assertEquals((long)1L, (long)((Integer)e.getKey()).intValue());
        Assert.assertEquals((long)1L, (long)((List)e.getValue()).size());
        Assert.assertEquals((long)0L, (long)((Integer)((List)e.getValue()).get(0)).intValue());
        targets.clear();
        edgeManager.routeInputSourceTaskFailedEventToDestination(2, (Map)targets);
        Assert.assertEquals((long)2L, (long)targets.size());
        for (Map.Entry entry : targets.entrySet()) {
            Assert.assertTrue(((Integer)entry.getKey() == 0 || (Integer)entry.getKey() == 1 ? 1 : 0) != 0);
            Assert.assertEquals((long)2L, (long)((List)entry.getValue()).size());
            Assert.assertEquals((long)4L, (long)((Integer)((List)entry.getValue()).get(0)).intValue());
            Assert.assertEquals((long)5L, (long)((Integer)((List)entry.getValue()).get(1)).intValue());
        }
    }

    @Test(timeout=5000L)
    public void testShuffleVertexManagerSlowStart() {
        Configuration conf = new Configuration();
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId3 = "Vertex3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "Vertex4";
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)mockManagedVertexId);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn((Object)3);
        mockInputVertices.put(mockSrcVertexId3, eProp3);
        try {
            manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
            Assert.assertFalse((boolean)true);
        }
        catch (TezUncheckedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Atleast 1 bipartite source should exist"));
        }
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.TaskWithLocationHint task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext)).scheduleVertexTasks(Mockito.anyList());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        manager.onVertexStarted(null);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        try {
            manager = this.createManager(conf, mockContext, -0.1f, 0.0f);
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinSrcCompletionFraction"));
        }
        try {
            manager = this.createManager(conf, mockContext, 0.5f, 0.3f);
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinSrcCompletionFraction"));
        }
        manager = this.createManager(conf, mockContext, 0.0f, 0.0f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalTasksToSchedule == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 0.25f, 0.25f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 1.0f, 1.0f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 1.0f, 1.0f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 0.25f, 0.75f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 0.25f, 1.0f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void test_Tez1649_with_scatter_gather_edges() {
        Configuration conf = new Configuration();
        conf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices_R2 = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId_R2 = "R2";
        mockInputVertices_R2.put(r1, eProp1);
        mockInputVertices_R2.put(m2, eProp2);
        mockInputVertices_R2.put(m3, eProp3);
        final VertexManagerPluginContext mockContext_R2 = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
        Mockito.when((Object)mockContext_R2.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m3)).thenReturn((Object)3);
        final HashMap edgeManagerR2 = new HashMap();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)2);
                edgeManagerR2.clear();
                for (Map.Entry entry : ((Map)invocation.getArguments()[2]).entrySet()) {
                    final UserPayload userPayload = ((EdgeManagerPluginDescriptor)entry.getValue()).getUserPayload();
                    EdgeManagerPluginContext emContext = new EdgeManagerPluginContext(){

                        public UserPayload getUserPayload() {
                            return userPayload == null ? null : userPayload;
                        }

                        public String getSourceVertexName() {
                            return null;
                        }

                        public String getDestinationVertexName() {
                            return null;
                        }

                        public int getSourceVertexNumTasks() {
                            return 2;
                        }

                        public int getDestinationVertexNumTasks() {
                            return 2;
                        }
                    };
                    EdgeManagerPlugin edgeManager = (EdgeManagerPlugin)ReflectionUtils.createClazzInstance((String)((EdgeManagerPluginDescriptor)entry.getValue()).getClassName(), (Class[])new Class[]{EdgeManagerPluginContext.class}, (Object[])new Object[]{emContext});
                    edgeManager.initialize();
                    edgeManagerR2.put(entry.getKey(), edgeManager);
                }
                return null;
            }
        }).when((Object)mockContext_R2)).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        ByteBuffer payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer();
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)"Vertex", (ByteBuffer)payload);
        manager = this.createManager(conf, mockContext_R2, 0.001f, 0.001f);
        Assert.assertTrue((manager.bipartiteSources == 3 ? 1 : 0) != 0);
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.TaskWithLocationHint task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext_R2)).scheduleVertexTasks(Mockito.anyList());
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)9L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 9 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(m3, new Integer(0));
        manager.onSourceTaskCompleted(m3, new Integer(1));
        manager.onSourceTaskCompleted(m3, new Integer(2));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 9 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(m2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 9 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext_R2, (VerificationMode)Mockito.times((int)0))).setVertexParallelism(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext_R2, (VerificationMode)Mockito.times((int)1))).setVertexParallelism(Mockito.eq((int)1), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap(), Mockito.anyMap());
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        Mockito.when((Object)mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
        Mockito.when((Object)mockContext_R2.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(r1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m3)).thenReturn((Object)3);
        manager = this.createManager(conf, mockContext_R2, 0.001f, 0.001f);
        manager.onVertexStarted(null);
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(m3, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void test_Tez1649_with_mixed_edges() {
        Configuration conf = new Configuration();
        conf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "R2";
        mockInputVertices.put(r1, eProp1);
        mockInputVertices.put(m2, eProp2);
        mockInputVertices.put(m3, eProp3);
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        manager = this.createManager(conf, mockContext, 0.001f, 0.001f);
        Assert.assertTrue((manager.bipartiteSources == 1 ? 1 : 0) != 0);
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.TaskWithLocationHint task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext)).scheduleVertexTasks(Mockito.anyList());
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(r1, new Integer(0));
        manager.onSourceTaskCompleted(r1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(m2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, 0.001f, 0.001f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(r1, new Integer(0));
        manager.onSourceTaskCompleted(r1, new Integer(1));
        manager.onSourceTaskCompleted(r1, new Integer(2));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, 0.001f, 0.001f);
        manager.onVertexStarted(null);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        manager = this.createManager(conf, mockContext, 0.001f, 0.001f);
        manager.onVertexStarted(null);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(r1, new Integer(0));
        manager.onSourceTaskCompleted(r1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, 0.001f, 0.001f);
        manager.onVertexStarted(null);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(r1, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
    }

    private ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, float min, float max) {
        UserPayload payload;
        conf.setFloat("tez.shuffle-vertex-manager.min-src-fraction", min);
        conf.setFloat("tez.shuffle-vertex-manager.max-src-fraction", max);
        try {
            payload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        Mockito.when((Object)context.getUserPayload()).thenReturn((Object)payload);
        ShuffleVertexManager manager = new ShuffleVertexManager(context);
        manager.initialize();
        return manager;
    }
}

