package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.VertexIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager.class */
public class TestRootInputVertexManager {
    List<TaskAttemptIdentifier> emptyCompletions = null;

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/TestRootInputVertexManager$ScheduledTasksAnswer.class */
    protected static class ScheduledTasksAnswer implements Answer<Object> {
        private List<Integer> scheduledTasks;

        public ScheduledTasksAnswer(List<Integer> list) {
            this.scheduledTasks = list;
        }

        public Object answer(InvocationOnMock invocationOnMock) throws IOException {
            Object[] arguments = invocationOnMock.getArguments();
            this.scheduledTasks.clear();
            Iterator it = ((List) arguments[0]).iterator();
            while (it.hasNext()) {
                this.scheduledTasks.add(Integer.valueOf(((VertexManagerPluginContext.ScheduleTaskRequest) it.next()).getTaskIndex()));
            }
            return null;
        }
    }

    @Test(timeout = 5000)
    public void testEventsFromMultipleInputs() throws IOException {
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(new TezConfiguration());
        ((VertexManagerPluginContext) Mockito.doReturn("vertex1").when(vertexManagerPluginContext)).getVertexName();
        ((VertexManagerPluginContext) Mockito.doReturn(1).when(vertexManagerPluginContext)).getVertexNumTasks((String) Matchers.eq("vertex1"));
        ((VertexManagerPluginContext) Mockito.doReturn(createUserPayloadFromConf).when(vertexManagerPluginContext)).getUserPayload();
        RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(vertexManagerPluginContext);
        rootInputVertexManager.initialize();
        InputDescriptor inputDescriptor = (InputDescriptor) Mockito.mock(InputDescriptor.class);
        LinkedList linkedList = new LinkedList();
        linkedList.add(InputDataInformationEvent.createWithSerializedPayload(0, (ByteBuffer) null));
        rootInputVertexManager.onRootVertexInitialized("input1", inputDescriptor, linkedList);
        InputDescriptor inputDescriptor2 = (InputDescriptor) Mockito.mock(InputDescriptor.class);
        LinkedList linkedList2 = new LinkedList();
        linkedList2.add(InputDataInformationEvent.createWithSerializedPayload(0, (ByteBuffer) null));
        try {
            rootInputVertexManager.onRootVertexInitialized("input2", inputDescriptor2, linkedList2);
            Assert.fail("Expecting failure in case of multiple inputs attempting to send events");
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().startsWith("RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
        }
    }

    @Test(timeout = 5000)
    public void testConfigureFromMultipleInputs() throws IOException {
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(new TezConfiguration());
        ((VertexManagerPluginContext) Mockito.doReturn("vertex1").when(vertexManagerPluginContext)).getVertexName();
        ((VertexManagerPluginContext) Mockito.doReturn(-1).when(vertexManagerPluginContext)).getVertexNumTasks((String) Matchers.eq("vertex1"));
        ((VertexManagerPluginContext) Mockito.doReturn(createUserPayloadFromConf).when(vertexManagerPluginContext)).getUserPayload();
        RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(vertexManagerPluginContext);
        rootInputVertexManager.initialize();
        InputDescriptor inputDescriptor = (InputDescriptor) Mockito.mock(InputDescriptor.class);
        LinkedList linkedList = new LinkedList();
        linkedList.add(InputConfigureVertexTasksEvent.create(1, (VertexLocationHint) null, (InputSpecUpdate) null));
        rootInputVertexManager.onRootVertexInitialized("input1", inputDescriptor, linkedList);
        InputDescriptor inputDescriptor2 = (InputDescriptor) Mockito.mock(InputDescriptor.class);
        LinkedList linkedList2 = new LinkedList();
        linkedList2.add(InputConfigureVertexTasksEvent.create(1, (VertexLocationHint) null, (InputSpecUpdate) null));
        try {
            rootInputVertexManager.onRootVertexInitialized("input2", inputDescriptor2, linkedList2);
            Assert.fail("Expecting failure in case of multiple inputs attempting to send events");
        } catch (IllegalStateException e) {
            Assert.assertTrue(e.getMessage().startsWith("RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
        }
    }

    @Test(timeout = 5000)
    public void testRootInputVertexManagerSlowStart() {
        Configuration configuration = new Configuration();
        HashMap hashMap = new HashMap();
        EdgeProperty create = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create2 = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when(vertexManagerPluginContext.getVertexStatistics((String) Matchers.any(String.class))).thenReturn((VertexStatistics) Mockito.mock(VertexStatistics.class));
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("Vertex3");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex3"))).thenReturn(3);
        hashMap.put("Vertex1", create);
        hashMap.put("Vertex2", create2);
        RootInputVertexManager createRootInputVertexManager = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        LinkedList newLinkedList = Lists.newLinkedList();
        ((VertexManagerPluginContext) Mockito.doAnswer(new ScheduledTasksAnswer(newLinkedList)).when(vertexManagerPluginContext)).scheduleTasks(Matchers.anyList());
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(0);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(0);
        createRootInputVertexManager.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 3L);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(2);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(2);
        try {
            createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(-0.1f), Float.valueOf(0.0f));
            Assert.assertTrue(false);
        } catch (IllegalArgumentException e) {
            Assert.assertTrue(e.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.0f), Float.valueOf(95.0f));
            Assert.assertTrue(false);
        } catch (IllegalArgumentException e2) {
            Assert.assertTrue(e2.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.5f), Float.valueOf(0.3f));
            Assert.assertTrue(false);
        } catch (IllegalArgumentException e3) {
            Assert.assertTrue(e3.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(20);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(20);
        newLinkedList.clear();
        RootInputVertexManager createRootInputVertexManager2 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.975f), null);
        createRootInputVertexManager2.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager2.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(3L, createRootInputVertexManager2.pendingTasks.size());
        Assert.assertEquals(20 * 2, createRootInputVertexManager2.totalNumSourceTasks);
        Assert.assertEquals(0L, createRootInputVertexManager2.numSourceTasksCompleted);
        float f = 0.975f * 20;
        for (String str : new String[]{"Vertex1", "Vertex2"}) {
            for (int i = 0; i < vertexManagerPluginContext.getVertexNumTasks(str); i++) {
                createRootInputVertexManager2.onSourceTaskCompleted(createTaskAttemptIdentifier(str, i + 1));
                if (i + 2 >= f) {
                    break;
                }
            }
        }
        Assert.assertEquals(createRootInputVertexManager2.totalTasksToSchedule, createRootInputVertexManager2.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        createRootInputVertexManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals(3L, createRootInputVertexManager2.pendingTasks.size());
        Assert.assertEquals(0L, newLinkedList.size());
        createRootInputVertexManager2.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(0L, createRootInputVertexManager2.pendingTasks.size());
        Assert.assertEquals(createRootInputVertexManager2.totalTasksToSchedule, newLinkedList.size());
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(2);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(2);
        RootInputVertexManager createRootInputVertexManager3 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.0f), Float.valueOf(0.0f));
        createRootInputVertexManager3.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals(createRootInputVertexManager3.totalTasksToSchedule, 3L);
        Assert.assertEquals(createRootInputVertexManager3.numSourceTasksCompleted, 0L);
        createRootInputVertexManager3.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager3.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager3.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager3.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 3L);
        RootInputVertexManager createRootInputVertexManager4 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(0.25f));
        createRootInputVertexManager4.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager4.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager4.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager4.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager4.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager4.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager4.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager4.numSourceTasksCompleted, 0L);
        createRootInputVertexManager4.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(createRootInputVertexManager4.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager4.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager4.numSourceTasksCompleted, 1L);
        createRootInputVertexManager4.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals(createRootInputVertexManager4.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager4.numSourceTasksCompleted, 2L);
        RootInputVertexManager createRootInputVertexManager5 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        createRootInputVertexManager5.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager5.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager5.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager5.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager5.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager5.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager5.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager5.numSourceTasksCompleted, 0L);
        createRootInputVertexManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals(createRootInputVertexManager5.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager5.numSourceTasksCompleted, 1L);
        createRootInputVertexManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        Assert.assertEquals(createRootInputVertexManager5.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager5.numSourceTasksCompleted, 2L);
        createRootInputVertexManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(createRootInputVertexManager5.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager5.numSourceTasksCompleted, 3L);
        createRootInputVertexManager5.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertEquals(createRootInputVertexManager5.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager5.numSourceTasksCompleted, 4L);
        RootInputVertexManager createRootInputVertexManager6 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        createRootInputVertexManager6.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager6.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager6.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager6.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager6.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager6.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager6.totalNumSourceTasks, 4L);
        Assert.assertEquals(createRootInputVertexManager6.numSourceTasksCompleted, 0L);
        createRootInputVertexManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals(createRootInputVertexManager6.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager6.numSourceTasksCompleted, 1L);
        createRootInputVertexManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        Assert.assertEquals(createRootInputVertexManager6.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager6.numSourceTasksCompleted, 2L);
        createRootInputVertexManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(createRootInputVertexManager6.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager6.numSourceTasksCompleted, 3L);
        createRootInputVertexManager6.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertEquals(createRootInputVertexManager6.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager6.numSourceTasksCompleted, 4L);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(4);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex2"))).thenReturn(4);
        RootInputVertexManager createRootInputVertexManager7 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        createRootInputVertexManager7.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager7.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager7.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager7.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager7.totalNumSourceTasks, 8L);
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertEquals(createRootInputVertexManager7.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager7.numSourceTasksCompleted, 2L);
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertEquals(createRootInputVertexManager7.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager7.numSourceTasksCompleted, 2L);
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(createRootInputVertexManager7.pendingTasks.size(), 1L);
        Assert.assertEquals(newLinkedList.size(), 2L);
        Assert.assertEquals(createRootInputVertexManager7.numSourceTasksCompleted, 4L);
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 2));
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 2));
        Assert.assertEquals(createRootInputVertexManager7.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 1L);
        Assert.assertEquals(createRootInputVertexManager7.numSourceTasksCompleted, 6L);
        newLinkedList.clear();
        createRootInputVertexManager7.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 3));
        Assert.assertEquals(createRootInputVertexManager7.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 0L);
        Assert.assertEquals(createRootInputVertexManager7.numSourceTasksCompleted, 7L);
        RootInputVertexManager createRootInputVertexManager8 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(1.0f));
        createRootInputVertexManager8.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager8.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager8.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager8.pendingTasks.size(), 3L);
        Assert.assertEquals(createRootInputVertexManager8.totalNumSourceTasks, 8L);
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(createRootInputVertexManager8.pendingTasks.size(), 2L);
        Assert.assertEquals(newLinkedList.size(), 1L);
        Assert.assertEquals(createRootInputVertexManager8.numSourceTasksCompleted, 4L);
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 2));
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 2));
        Assert.assertEquals(createRootInputVertexManager8.pendingTasks.size(), 1L);
        Assert.assertEquals(newLinkedList.size(), 1L);
        Assert.assertEquals(createRootInputVertexManager8.numSourceTasksCompleted, 6L);
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 3));
        createRootInputVertexManager8.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 3));
        Assert.assertEquals(createRootInputVertexManager8.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 1L);
        Assert.assertEquals(createRootInputVertexManager8.numSourceTasksCompleted, 8L);
        newLinkedList.clear();
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex3"))).thenReturn(1);
        RootInputVertexManager createRootInputVertexManager9 = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        createRootInputVertexManager9.onVertexStarted(this.emptyCompletions);
        createRootInputVertexManager9.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        createRootInputVertexManager9.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        Assert.assertEquals(createRootInputVertexManager9.pendingTasks.size(), 1L);
        Assert.assertEquals(createRootInputVertexManager9.totalNumSourceTasks, 8L);
        createRootInputVertexManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 0));
        createRootInputVertexManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 1));
        Assert.assertEquals(createRootInputVertexManager9.pendingTasks.size(), 1L);
        Assert.assertEquals(newLinkedList.size(), 0L);
        Assert.assertEquals(createRootInputVertexManager9.numSourceTasksCompleted, 2L);
        createRootInputVertexManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 1));
        createRootInputVertexManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals(createRootInputVertexManager9.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 1L);
        Assert.assertEquals(createRootInputVertexManager9.numSourceTasksCompleted, 4L);
        newLinkedList.clear();
        createRootInputVertexManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 2));
        createRootInputVertexManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex2", 2));
        Assert.assertEquals(createRootInputVertexManager9.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 0L);
        Assert.assertEquals(createRootInputVertexManager9.numSourceTasksCompleted, 6L);
        newLinkedList.clear();
        createRootInputVertexManager9.onSourceTaskCompleted(createTaskAttemptIdentifier("Vertex1", 3));
        Assert.assertEquals(createRootInputVertexManager9.pendingTasks.size(), 0L);
        Assert.assertEquals(newLinkedList.size(), 0L);
        Assert.assertEquals(createRootInputVertexManager9.numSourceTasksCompleted, 7L);
    }

    @Test
    public void testTezDrainCompletionsOnVertexStart() throws IOException {
        Configuration configuration = new Configuration();
        HashMap hashMap = new HashMap();
        EdgeProperty create = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when(vertexManagerPluginContext.getVertexStatistics((String) Matchers.any(String.class))).thenReturn((VertexStatistics) Mockito.mock(VertexStatistics.class));
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("Vertex1"))).thenReturn(3);
        hashMap.put("Vertex1", create);
        RootInputVertexManager createRootInputVertexManager = createRootInputVertexManager(configuration, vertexManagerPluginContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        Assert.assertEquals(0L, createRootInputVertexManager.numSourceTasksCompleted);
        createRootInputVertexManager.onVertexStarted(Collections.singletonList(createTaskAttemptIdentifier("Vertex1", 0)));
        Assert.assertEquals(1L, createRootInputVertexManager.numSourceTasksCompleted);
    }

    static RootInputVertexManager createRootInputVertexManager(Configuration configuration, VertexManagerPluginContext vertexManagerPluginContext, Float f, Float f2) {
        if (f != null) {
            configuration.setFloat("tez.root-input-vertex-manager.min-src-fraction", f.floatValue());
        } else {
            configuration.unset("tez.root-input-vertex-manager.min-src-fraction");
        }
        if (f2 != null) {
            configuration.setFloat("tez.root-input-vertex-manager.max-src-fraction", f2.floatValue());
        } else {
            configuration.unset("tez.root-input-vertex-manager.max-src-fraction");
        }
        if (f2 != null || f != null) {
            configuration.setBoolean("tez.root-input-vertex-manager.enable.slow-start", true);
        }
        try {
            Mockito.when(vertexManagerPluginContext.getUserPayload()).thenReturn(TezUtils.createUserPayloadFromConf(configuration));
            RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(vertexManagerPluginContext);
            rootInputVertexManager.initialize();
            return rootInputVertexManager;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static TaskAttemptIdentifier createTaskAttemptIdentifier(String str, int i) {
        VertexIdentifier vertexIdentifier = (VertexIdentifier) Mockito.mock(VertexIdentifier.class);
        Mockito.when(vertexIdentifier.getName()).thenReturn(str);
        TaskIdentifier taskIdentifier = (TaskIdentifier) Mockito.mock(TaskIdentifier.class);
        Mockito.when(Integer.valueOf(taskIdentifier.getIdentifier())).thenReturn(Integer.valueOf(i));
        Mockito.when(taskIdentifier.getVertexIdentifier()).thenReturn(vertexIdentifier);
        TaskAttemptIdentifier taskAttemptIdentifier = (TaskAttemptIdentifier) Mockito.mock(TaskAttemptIdentifier.class);
        Mockito.when(Integer.valueOf(taskAttemptIdentifier.getIdentifier())).thenReturn(0);
        Mockito.when(taskAttemptIdentifier.getTaskIdentifier()).thenReturn(taskIdentifier);
        return taskAttemptIdentifier;
    }
}
