package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManagerUtils;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/TestFairShuffleVertexManager.class */
public class TestFairShuffleVertexManager extends TestShuffleVertexManagerUtils {
    List<TaskAttemptIdentifier> emptyCompletions = null;

    @Test(timeout = 5000)
    public void testAutoParallelismConfig() throws Exception {
        VertexManagerPluginContext createVertexManagerContext = createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, Lists.newLinkedList(), null);
        FairShuffleVertexManager createManager = createManager(null, createVertexManagerContext, null, Float.valueOf(0.5f));
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).vertexReconfigurationPlanned();
        Assert.assertTrue(createManager.config.isAutoParallelismEnabled());
        Assert.assertTrue(createManager.config.getDesiredTaskInputDataSize() == 1000 * MB);
        Assert.assertTrue(createManager.config.getMinFraction() == 0.25f);
        Assert.assertTrue(createManager.config.getMaxFraction() == 0.5f);
        FairShuffleVertexManager createManager2 = createManager(null, createVertexManagerContext, null, null, null, null);
        ((VertexManagerPluginContext) Mockito.verify(createVertexManagerContext, Mockito.times(1))).vertexReconfigurationPlanned();
        Assert.assertTrue(!createManager2.config.isAutoParallelismEnabled());
        Assert.assertTrue(createManager2.config.getDesiredTaskInputDataSize() == FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT);
        Assert.assertTrue(createManager2.config.getMinFraction() == 0.25f);
        Assert.assertTrue(createManager2.config.getMaxFraction() == 0.75f);
    }

    @Test(timeout = 5000)
    public void testInvalidSetup() {
        try {
            createFairShuffleVertexManager(new Configuration(), createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, Lists.newLinkedList(), null), FairShuffleVertexManager.FairRoutingType.FAIR_PARALLELISM, Long.valueOf(1000 * MB), Float.valueOf(0.001f), Float.valueOf(0.001f)).onVertexStarted(this.emptyCompletions);
            Assert.assertFalse(true);
        } catch (TezUncheckedException e) {
            Assert.assertTrue(e.getMessage().contains("Having more than one destination task process same partition(s) only works with one bipartite source."));
        }
    }

    @Test(timeout = 5000)
    public void testReduceSchedulingWithPartitionStats() throws Exception {
        HashMap hashMap = new HashMap();
        testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType.REDUCE_PARALLELISM, 300, new long[]{MB, 2 * MB, 5 * MB}, 2, 2, 2, hashMap);
        EdgeManagerPluginOnDemand next = hashMap.values().iterator().next();
        Assert.assertEquals(600L, next.getNumDestinationTaskPhysicalInputs(0));
        for (int i = 0; i < 300; i++) {
            for (int i2 = 0; i2 < 2; i2++) {
                if (i2 == 0) {
                    EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination = next.routeCompositeDataMovementEventToDestination(i, 0);
                    Assert.assertEquals(2L, routeCompositeDataMovementEventToDestination.getCount());
                    Assert.assertEquals(0L, routeCompositeDataMovementEventToDestination.getSource());
                    Assert.assertEquals(i * 2, routeCompositeDataMovementEventToDestination.getTarget());
                } else {
                    EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination = next.routeInputSourceTaskFailedEventToDestination(i, 0);
                    Assert.assertEquals(2L, routeInputSourceTaskFailedEventToDestination.getNumEvents());
                    Assert.assertArrayEquals(new int[]{0 + (i * 2), 1 + (i * 2)}, routeInputSourceTaskFailedEventToDestination.getTargetIndices());
                }
            }
        }
    }

    @Test(timeout = 5000)
    public void testFairSchedulingWithPartitionStats() throws Exception {
        HashMap hashMap = new HashMap();
        testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType.FAIR_PARALLELISM, 300, new long[]{MB, 2 * MB, 5 * MB}, 2, 3, 2, hashMap);
        EdgeManagerPluginOnDemand next = hashMap.values().iterator().next();
        Assert.assertEquals(600L, next.getNumDestinationTaskPhysicalInputs(0));
        for (int i = 0; i < 300; i++) {
            for (int i2 = 0; i2 < 2; i2++) {
                if (i2 == 0) {
                    EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination = next.routeCompositeDataMovementEventToDestination(i, 0);
                    Assert.assertEquals(2L, routeCompositeDataMovementEventToDestination.getCount());
                    Assert.assertEquals(0L, routeCompositeDataMovementEventToDestination.getSource());
                    Assert.assertEquals(i * 2, routeCompositeDataMovementEventToDestination.getTarget());
                } else {
                    EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination = next.routeInputSourceTaskFailedEventToDestination(i, 0);
                    Assert.assertEquals(2L, routeInputSourceTaskFailedEventToDestination.getNumEvents());
                    Assert.assertArrayEquals(new int[]{0 + (i * 2), 1 + (i * 2)}, routeInputSourceTaskFailedEventToDestination.getTargetIndices());
                }
            }
        }
        Assert.assertEquals(150L, next.getNumDestinationTaskPhysicalInputs(1));
        for (int i3 = 0; i3 < 2; i3++) {
            if (i3 == 0) {
                EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination2 = next.routeCompositeDataMovementEventToDestination(0, 1);
                Assert.assertEquals(1L, routeCompositeDataMovementEventToDestination2.getCount());
                Assert.assertEquals(2L, routeCompositeDataMovementEventToDestination2.getSource());
                Assert.assertEquals(0L, routeCompositeDataMovementEventToDestination2.getTarget());
            } else {
                EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination2 = next.routeInputSourceTaskFailedEventToDestination(0, 1);
                Assert.assertEquals(1L, routeInputSourceTaskFailedEventToDestination2.getNumEvents());
                Assert.assertEquals(0L, routeInputSourceTaskFailedEventToDestination2.getTargetIndices()[0]);
            }
        }
        Assert.assertEquals(150L, next.getNumDestinationTaskPhysicalInputs(2));
        for (int i4 = 150; i4 < 300; i4++) {
            for (int i5 = 0; i5 < 2; i5++) {
                if (i5 == 0) {
                    EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination3 = next.routeCompositeDataMovementEventToDestination(i4, 2);
                    Assert.assertEquals(1L, routeCompositeDataMovementEventToDestination3.getCount());
                    Assert.assertEquals(2L, routeCompositeDataMovementEventToDestination3.getSource());
                    Assert.assertEquals(i4 - 150, routeCompositeDataMovementEventToDestination3.getTarget());
                } else {
                    EdgeManagerPluginOnDemand.EventRouteMetadata routeInputSourceTaskFailedEventToDestination3 = next.routeInputSourceTaskFailedEventToDestination(i4, 2);
                    Assert.assertEquals(1L, routeInputSourceTaskFailedEventToDestination3.getNumEvents());
                    Assert.assertEquals(i4 - 150, routeInputSourceTaskFailedEventToDestination3.getTargetIndices()[0]);
                }
            }
        }
    }

    @Test(timeout = 500000)
    public void testOverflow() throws Exception {
        testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType.FAIR_PARALLELISM, 30000, new long[]{1 * MB, 2 * MB, 500 * MB}, 1000, 15090, 3, new HashMap());
    }

    private void testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType fairRoutingType, int i, long[] jArr, int i2, int i3, int i4, Map<String, EdgeManagerPlugin> map) throws Exception {
        Configuration configuration = new Configuration();
        HashMap hashMap = new HashMap();
        EdgeProperty create = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create2 = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        EdgeProperty create3 = EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, OutputDescriptor.create("out"), InputDescriptor.create("in"));
        hashMap.put("R1", create);
        hashMap.put("M2", create2);
        hashMap.put("M3", create3);
        VertexManagerPluginContext vertexManagerPluginContext = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when(vertexManagerPluginContext.getInputVertexEdgeProperties()).thenReturn(hashMap);
        Mockito.when(vertexManagerPluginContext.getVertexName()).thenReturn("R2");
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("R1"))).thenReturn(Integer.valueOf(i));
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M2"))).thenReturn(3);
        Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks("M3"))).thenReturn(3);
        LinkedList newLinkedList = Lists.newLinkedList();
        ((VertexManagerPluginContext) Mockito.doAnswer(new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(newLinkedList)).when(vertexManagerPluginContext)).scheduleTasks(Mockito.anyList());
        ((VertexManagerPluginContext) Mockito.doAnswer(new TestShuffleVertexManagerUtils.reconfigVertexAnswer(vertexManagerPluginContext, "R2", map)).when(vertexManagerPluginContext)).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(), Mockito.anyMap());
        FairShuffleVertexManager createFairShuffleVertexManager = createFairShuffleVertexManager(configuration, vertexManagerPluginContext, fairRoutingType, Long.valueOf(1000 * MB), Float.valueOf(0.001f), Float.valueOf(0.001f));
        createFairShuffleVertexManager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue(createFairShuffleVertexManager.bipartiteSources == 1);
        createFairShuffleVertexManager.onVertexStateUpdated(new VertexStateUpdate("R1", VertexState.CONFIGURED));
        createFairShuffleVertexManager.onVertexStateUpdated(new VertexStateUpdate("M2", VertexState.CONFIGURED));
        Assert.assertEquals(3L, createFairShuffleVertexManager.pendingTasks.size());
        Assert.assertEquals(i, createFairShuffleVertexManager.totalNumBipartiteSourceTasks);
        Assert.assertEquals(0L, createFairShuffleVertexManager.numBipartiteSourceTasksCompleted);
        Assert.assertTrue(createFairShuffleVertexManager.pendingTasks.size() == 3);
        Assert.assertTrue(createFairShuffleVertexManager.totalNumBipartiteSourceTasks == i);
        for (int i5 = 0; i5 < i2; i5++) {
            VertexManagerEvent vertexManagerEvent = getVertexManagerEvent(jArr, 0L, "R1", true);
            createFairShuffleVertexManager.onSourceTaskCompleted(vertexManagerEvent.getProducerAttemptIdentifier());
            createFairShuffleVertexManager.onVertexManagerEventReceived(vertexManagerEvent);
        }
        createFairShuffleVertexManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M2", 0));
        Assert.assertTrue(createFairShuffleVertexManager.pendingTasks.size() == 3);
        Assert.assertTrue(createFairShuffleVertexManager.totalNumBipartiteSourceTasks == i);
        createFairShuffleVertexManager.onVertexStateUpdated(new VertexStateUpdate("M3", VertexState.CONFIGURED));
        createFairShuffleVertexManager.onSourceTaskCompleted(createTaskAttemptIdentifier("M3", 0));
        Assert.assertTrue(createFairShuffleVertexManager.pendingTasks.size() == 0);
        Assert.assertTrue(newLinkedList.size() == i3);
        Assert.assertEquals(1L, map.size());
        EdgeManagerPluginOnDemand next = map.values().iterator().next();
        for (int i6 = 0; i6 < i; i6++) {
            Assert.assertEquals(3L, next.getNumSourceTaskPhysicalOutputs(0));
        }
        for (int i7 = 0; i7 < i; i7++) {
            Assert.assertEquals(i4, next.getNumDestinationConsumerTasks(i7));
        }
    }

    private static FairShuffleVertexManager createManager(Configuration configuration, VertexManagerPluginContext vertexManagerPluginContext, Float f, Float f2) {
        return createManager(configuration, vertexManagerPluginContext, true, Long.valueOf(1000 * MB), f, f2);
    }

    private static FairShuffleVertexManager createManager(Configuration configuration, VertexManagerPluginContext vertexManagerPluginContext, Boolean bool, Long l, Float f, Float f2) {
        return TestShuffleVertexManagerBase.createManager(FairShuffleVertexManager.class, configuration, vertexManagerPluginContext, bool, l, f, f2);
    }
}
