/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManagerBase;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManagerUtils;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestFairShuffleVertexManager
extends TestShuffleVertexManagerUtils {
    List<TaskAttemptIdentifier> emptyCompletions = null;

    @Test(timeout=5000L)
    public void testAutoParallelismConfig() throws Exception {
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null);
        FairShuffleVertexManager manager = TestFairShuffleVertexManager.createManager(null, mockContext, null, Float.valueOf(0.5f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((boolean)manager.config.isAutoParallelismEnabled());
        Assert.assertTrue((manager.config.getDesiredTaskInputDataSize() == 1000L * MB ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMinFraction() == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMaxFraction() == 0.5f ? 1 : 0) != 0);
        manager = TestFairShuffleVertexManager.createManager(null, mockContext, null, null, null, null);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((!manager.config.isAutoParallelismEnabled() ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getDesiredTaskInputDataSize() == FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMinFraction() == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMaxFraction() == 0.75f ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void testInvalidSetup() {
        Configuration conf = new Configuration();
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null);
        try {
            FairShuffleVertexManager manager = TestFairShuffleVertexManager.createFairShuffleVertexManager(conf, mockContext, FairShuffleVertexManager.FairRoutingType.FAIR_PARALLELISM, 1000L * MB, Float.valueOf(0.001f), Float.valueOf(0.001f));
            manager.onVertexStarted(this.emptyCompletions);
            Assert.assertFalse((boolean)true);
        }
        catch (TezUncheckedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Having more than one destination task process same partition(s) only works with one bipartite source."));
        }
    }

    @Test(timeout=5000L)
    public void testReduceSchedulingWithPartitionStats() throws Exception {
        int numScatherAndGatherSourceTasks = 300;
        HashMap<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>();
        long[] partitionStats = new long[]{MB, 2L * MB, 5L * MB};
        this.testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType.REDUCE_PARALLELISM, 300, partitionStats, 2, 2, 2, newEdgeManagers);
        EdgeManagerPluginOnDemand edgeManager = (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next();
        Assert.assertEquals((long)600L, (long)edgeManager.getNumDestinationTaskPhysicalInputs(0));
        for (int sourceTaskIndex = 0; sourceTaskIndex < 300; ++sourceTaskIndex) {
            for (int j = 0; j < 2; ++j) {
                EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata;
                if (j == 0) {
                    routeMetadata = edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 0);
                    Assert.assertEquals((long)2L, (long)routeMetadata.getCount());
                    Assert.assertEquals((long)0L, (long)routeMetadata.getSource());
                    Assert.assertEquals((long)(sourceTaskIndex * 2), (long)routeMetadata.getTarget());
                    continue;
                }
                routeMetadata = edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 0);
                Assert.assertEquals((long)2L, (long)routeMetadata.getNumEvents());
                Assert.assertArrayEquals((int[])new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2}, (int[])routeMetadata.getTargetIndices());
            }
        }
    }

    @Test(timeout=5000L)
    public void testFairSchedulingWithPartitionStats() throws Exception {
        EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata;
        int sourceTaskIndex;
        int numScatherAndGatherSourceTasks = 300;
        HashMap<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>();
        long[] partitionStats = new long[]{MB, 2L * MB, 5L * MB};
        this.testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType.FAIR_PARALLELISM, 300, partitionStats, 2, 3, 2, newEdgeManagers);
        EdgeManagerPluginOnDemand edgeManager = (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next();
        Assert.assertEquals((long)600L, (long)edgeManager.getNumDestinationTaskPhysicalInputs(0));
        for (sourceTaskIndex = 0; sourceTaskIndex < 300; ++sourceTaskIndex) {
            for (int j = 0; j < 2; ++j) {
                if (j == 0) {
                    routeMetadata = edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 0);
                    Assert.assertEquals((long)2L, (long)routeMetadata.getCount());
                    Assert.assertEquals((long)0L, (long)routeMetadata.getSource());
                    Assert.assertEquals((long)(sourceTaskIndex * 2), (long)routeMetadata.getTarget());
                    continue;
                }
                routeMetadata = edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 0);
                Assert.assertEquals((long)2L, (long)routeMetadata.getNumEvents());
                Assert.assertArrayEquals((int[])new int[]{0 + sourceTaskIndex * 2, 1 + sourceTaskIndex * 2}, (int[])routeMetadata.getTargetIndices());
            }
        }
        Assert.assertEquals((long)150L, (long)edgeManager.getNumDestinationTaskPhysicalInputs(1));
        for (int j = 0; j < 2; ++j) {
            EdgeManagerPluginOnDemand.CompositeEventRouteMetadata routeMetadata2;
            if (j == 0) {
                routeMetadata2 = edgeManager.routeCompositeDataMovementEventToDestination(0, 1);
                Assert.assertEquals((long)1L, (long)routeMetadata2.getCount());
                Assert.assertEquals((long)2L, (long)routeMetadata2.getSource());
                Assert.assertEquals((long)0L, (long)routeMetadata2.getTarget());
                continue;
            }
            routeMetadata2 = edgeManager.routeInputSourceTaskFailedEventToDestination(0, 1);
            Assert.assertEquals((long)1L, (long)routeMetadata2.getNumEvents());
            Assert.assertEquals((long)0L, (long)routeMetadata2.getTargetIndices()[0]);
        }
        Assert.assertEquals((long)150L, (long)edgeManager.getNumDestinationTaskPhysicalInputs(2));
        for (sourceTaskIndex = 150; sourceTaskIndex < 300; ++sourceTaskIndex) {
            for (int j = 0; j < 2; ++j) {
                if (j == 0) {
                    routeMetadata = edgeManager.routeCompositeDataMovementEventToDestination(sourceTaskIndex, 2);
                    Assert.assertEquals((long)1L, (long)routeMetadata.getCount());
                    Assert.assertEquals((long)2L, (long)routeMetadata.getSource());
                    Assert.assertEquals((long)(sourceTaskIndex - 150), (long)routeMetadata.getTarget());
                    continue;
                }
                routeMetadata = edgeManager.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex, 2);
                Assert.assertEquals((long)1L, (long)routeMetadata.getNumEvents());
                Assert.assertEquals((long)(sourceTaskIndex - 150), (long)routeMetadata.getTargetIndices()[0]);
            }
        }
    }

    @Test(timeout=500000L)
    public void testOverflow() throws Exception {
        int numScatherAndGatherSourceTasks = 30000;
        HashMap<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>();
        boolean firstPartitionSize = true;
        int secondPartitionSize = 2;
        int thirdPartitionSize = 500;
        long[] partitionStats = new long[]{1L * MB, 2L * MB, 500L * MB};
        int expectedDestinationTasks = 15090;
        this.testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType.FAIR_PARALLELISM, 30000, partitionStats, 1000, 15090, 3, newEdgeManagers);
    }

    private void testSchedulingWithPartitionStats(FairShuffleVertexManager.FairRoutingType fairRoutingType, int numTasks, long[] partitionStats, int numCompletedEvents, int expectedScheduledTasks, int expectedNumDestinationConsumerTasks, Map<String, EdgeManagerPlugin> newEdgeManagers) throws Exception {
        Configuration conf = new Configuration();
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        int numOfTasksInr1 = numTasks;
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        int numOfTasksInM2 = 3;
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        int numOfTasksInM3 = 3;
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "R2";
        int numOfTasksInDestination = 3;
        mockInputVertices.put(r1, eProp1);
        mockInputVertices.put(m2, eProp2);
        mockInputVertices.put(m3, eProp3);
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)numOfTasksInr1);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(scheduledTasks)).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new TestShuffleVertexManagerUtils.reconfigVertexAnswer(mockContext, "R2", newEdgeManagers)).when((Object)mockContext)).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(), Mockito.anyMap());
        FairShuffleVertexManager manager = TestFairShuffleVertexManager.createFairShuffleVertexManager(conf, mockContext, fairRoutingType, 1000L * MB, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 1 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)numOfTasksInr1, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == numOfTasksInr1 ? 1 : 0) != 0);
        for (int i = 0; i < numCompletedEvents; ++i) {
            VertexManagerEvent vmEvent = this.getVertexManagerEvent(partitionStats, 0L, r1, true);
            manager.onSourceTaskCompleted(vmEvent.getProducerAttemptIdentifier());
            manager.onVertexManagerEventReceived(vmEvent);
        }
        manager.onSourceTaskCompleted(TestFairShuffleVertexManager.createTaskAttemptIdentifier(m2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == numOfTasksInr1 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(TestFairShuffleVertexManager.createTaskAttemptIdentifier(m3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == expectedScheduledTasks ? 1 : 0) != 0);
        Assert.assertEquals((long)1L, (long)newEdgeManagers.size());
        EdgeManagerPluginOnDemand edgeManager = (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next();
        for (int i = 0; i < numOfTasksInr1; ++i) {
            Assert.assertEquals((long)3L, (long)edgeManager.getNumSourceTaskPhysicalOutputs(0));
        }
        for (int sourceTaskIndex = 0; sourceTaskIndex < numOfTasksInr1; ++sourceTaskIndex) {
            Assert.assertEquals((long)expectedNumDestinationConsumerTasks, (long)edgeManager.getNumDestinationConsumerTasks(sourceTaskIndex));
        }
    }

    private static FairShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) {
        return TestFairShuffleVertexManager.createManager(conf, context, true, 1000L * MB, min, max);
    }

    private static FairShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, Boolean enableAutoParallelism, Long desiredTaskInputSize, Float min, Float max) {
        return (FairShuffleVertexManager)TestShuffleVertexManagerBase.createManager(FairShuffleVertexManager.class, conf, context, enableAutoParallelism, desiredTaskInputSize, min, max);
    }
}

