/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.VertexIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestRootInputVertexManager {
    List<TaskAttemptIdentifier> emptyCompletions = null;

    @Test(timeout=5000L)
    public void testEventsFromMultipleInputs() throws IOException {
        VertexManagerPluginContext context = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        TezConfiguration conf = new TezConfiguration();
        UserPayload vertexPayload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        ((VertexManagerPluginContext)Mockito.doReturn((Object)"vertex1").when((Object)context)).getVertexName();
        ((VertexManagerPluginContext)Mockito.doReturn((Object)1).when((Object)context)).getVertexNumTasks((String)Mockito.eq((Object)"vertex1"));
        ((VertexManagerPluginContext)Mockito.doReturn((Object)vertexPayload).when((Object)context)).getUserPayload();
        RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context);
        rootInputVertexManager.initialize();
        InputDescriptor id1 = (InputDescriptor)Mockito.mock(InputDescriptor.class);
        LinkedList<InputDataInformationEvent> events1 = new LinkedList<InputDataInformationEvent>();
        InputDataInformationEvent diEvent11 = InputDataInformationEvent.createWithSerializedPayload((int)0, null);
        events1.add(diEvent11);
        rootInputVertexManager.onRootVertexInitialized("input1", id1, events1);
        InputDescriptor id2 = (InputDescriptor)Mockito.mock(InputDescriptor.class);
        LinkedList<InputDataInformationEvent> events2 = new LinkedList<InputDataInformationEvent>();
        InputDataInformationEvent diEvent21 = InputDataInformationEvent.createWithSerializedPayload((int)0, null);
        events2.add(diEvent21);
        try {
            rootInputVertexManager.onRootVertexInitialized("input2", id2, events2);
            Assert.fail((String)"Expecting failure in case of multiple inputs attempting to send events");
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().startsWith("RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
        }
    }

    @Test(timeout=5000L)
    public void testConfigureFromMultipleInputs() throws IOException {
        VertexManagerPluginContext context = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        TezConfiguration conf = new TezConfiguration();
        UserPayload vertexPayload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        ((VertexManagerPluginContext)Mockito.doReturn((Object)"vertex1").when((Object)context)).getVertexName();
        ((VertexManagerPluginContext)Mockito.doReturn((Object)-1).when((Object)context)).getVertexNumTasks((String)Mockito.eq((Object)"vertex1"));
        ((VertexManagerPluginContext)Mockito.doReturn((Object)vertexPayload).when((Object)context)).getUserPayload();
        RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context);
        rootInputVertexManager.initialize();
        InputDescriptor id1 = (InputDescriptor)Mockito.mock(InputDescriptor.class);
        LinkedList<InputConfigureVertexTasksEvent> events1 = new LinkedList<InputConfigureVertexTasksEvent>();
        InputConfigureVertexTasksEvent diEvent11 = InputConfigureVertexTasksEvent.create((int)1, null, null);
        events1.add(diEvent11);
        rootInputVertexManager.onRootVertexInitialized("input1", id1, events1);
        InputDescriptor id2 = (InputDescriptor)Mockito.mock(InputDescriptor.class);
        LinkedList<InputConfigureVertexTasksEvent> events2 = new LinkedList<InputConfigureVertexTasksEvent>();
        InputConfigureVertexTasksEvent diEvent21 = InputConfigureVertexTasksEvent.create((int)1, null, null);
        events2.add(diEvent21);
        try {
            rootInputVertexManager.onRootVertexInitialized("input2", id2, events2);
            Assert.fail((String)"Expecting failure in case of multiple inputs attempting to send events");
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().startsWith("RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
        }
    }

    @Test(timeout=5000L)
    public void testRootInputVertexManagerSlowStart() {
        Configuration conf = new Configuration();
        RootInputVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "Vertex3";
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getVertexStatistics((String)Mockito.any())).thenReturn((Object)((VertexStatistics)Mockito.mock(VertexStatistics.class)));
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)mockManagedVertexId);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn((Object)3);
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new ScheduledTasksAnswer(scheduledTasks)).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)3L);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        try {
            manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(-0.1f), Float.valueOf(0.0f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(95.0f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.5f), Float.valueOf(0.3f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        int numTasks = 20;
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)numTasks);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)numTasks);
        scheduledTasks.clear();
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.975f), null);
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)(numTasks * 2), (long)manager.totalNumSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numSourceTasksCompleted);
        float completedTasksThreshold = 0.975f * (float)numTasks;
        block6: for (String mockSrcVertex : new String[]{mockSrcVertexId1, mockSrcVertexId2}) {
            for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
                manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertex, i + 1));
                if ((float)(i + 2) >= completedTasksThreshold) continue block6;
            }
        }
        Assert.assertEquals((long)manager.totalTasksToSchedule, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)manager.totalTasksToSchedule, (long)scheduledTasks.size());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(0.0f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)manager.totalTasksToSchedule, (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)0L);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)3L);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.25f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)0L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)1L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)2L);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)0L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)1L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)2L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)3L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)4L);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)4L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)0L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)1L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)2L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)3L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)4L);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)4);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)4);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)8L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)2L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)2L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)1L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)2L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)4L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)1L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)6L);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)0L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)7L);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)3L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)8L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)2L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)1L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)4L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)1L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)1L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)6L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 3));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)1L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)8L);
        scheduledTasks.clear();
        Mockito.when((Object)mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn((Object)1);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)1L);
        Assert.assertEquals((long)manager.totalNumSourceTasks, (long)8L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)1L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)0L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)2L);
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)1L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)4L);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)0L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)6L);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        Assert.assertEquals((long)manager.pendingTasks.size(), (long)0L);
        Assert.assertEquals((long)scheduledTasks.size(), (long)0L);
        Assert.assertEquals((long)manager.numSourceTasksCompleted, (long)7L);
    }

    @Test
    public void testTezDrainCompletionsOnVertexStart() throws IOException {
        Configuration conf = new Configuration();
        RootInputVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getVertexStatistics((String)Mockito.any())).thenReturn((Object)((VertexStatistics)Mockito.mock(VertexStatistics.class)));
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)3);
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        manager = TestRootInputVertexManager.createRootInputVertexManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        Assert.assertEquals((long)0L, (long)manager.numSourceTasksCompleted);
        manager.onVertexStarted(Collections.singletonList(TestRootInputVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
        Assert.assertEquals((long)1L, (long)manager.numSourceTasksCompleted);
    }

    static RootInputVertexManager createRootInputVertexManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) {
        UserPayload payload;
        if (min != null) {
            conf.setFloat("tez.root-input-vertex-manager.min-src-fraction", min.floatValue());
        } else {
            conf.unset("tez.root-input-vertex-manager.min-src-fraction");
        }
        if (max != null) {
            conf.setFloat("tez.root-input-vertex-manager.max-src-fraction", max.floatValue());
        } else {
            conf.unset("tez.root-input-vertex-manager.max-src-fraction");
        }
        if (max != null || min != null) {
            conf.setBoolean("tez.root-input-vertex-manager.enable.slow-start", true);
        }
        try {
            payload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        Mockito.when((Object)context.getUserPayload()).thenReturn((Object)payload);
        RootInputVertexManager manager = new RootInputVertexManager(context);
        manager.initialize();
        return manager;
    }

    public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
        VertexIdentifier mockVertex = (VertexIdentifier)Mockito.mock(VertexIdentifier.class);
        Mockito.when((Object)mockVertex.getName()).thenReturn((Object)vName);
        TaskIdentifier mockTask = (TaskIdentifier)Mockito.mock(TaskIdentifier.class);
        Mockito.when((Object)mockTask.getIdentifier()).thenReturn((Object)tId);
        Mockito.when((Object)mockTask.getVertexIdentifier()).thenReturn((Object)mockVertex);
        TaskAttemptIdentifier mockAttempt = (TaskAttemptIdentifier)Mockito.mock(TaskAttemptIdentifier.class);
        Mockito.when((Object)mockAttempt.getIdentifier()).thenReturn((Object)0);
        Mockito.when((Object)mockAttempt.getTaskIdentifier()).thenReturn((Object)mockTask);
        return mockAttempt;
    }

    protected static class ScheduledTasksAnswer
    implements Answer<Object> {
        private List<Integer> scheduledTasks;

        public ScheduledTasksAnswer(List<Integer> scheduledTasks) {
            this.scheduledTasks = scheduledTasks;
        }

        public Object answer(InvocationOnMock invocation) throws IOException {
            Object[] args = invocation.getArguments();
            this.scheduledTasks.clear();
            List tasks = (List)args[0];
            for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
                this.scheduledTasks.add(task.getTaskIndex());
            }
            return null;
        }
    }
}

