/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManagerBase;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManagerUtils;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestShuffleVertexManager
extends TestShuffleVertexManagerUtils {
    List<TaskAttemptIdentifier> emptyCompletions = null;

    @Test(timeout=5000L)
    public void testLargeDataSize() throws IOException {
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        HashMap<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, newEdgeManagers);
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(null, 5000L, "Vertex1");
        ShuffleVertexManager manager = TestShuffleVertexManager.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.pendingTasks.size() == 4 ? 1 : 0) != 0);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex1", 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex2", 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).doneReconfiguringVertex();
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).doneReconfiguringVertex();
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)5000L, (long)manager.completedSourceTasksOutputSize);
        scheduledTasks.clear();
        manager = TestShuffleVertexManager.createManager(conf, mockContext, true, 0x5555555555555400L, Float.valueOf(1.0f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex3", 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        vmEvent = this.getVertexManagerEvent(null, 0L, "Vertex1");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)0L, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 0L, "Vertex1");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex1", 1));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)0L, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 0x3FFFFFFFFFFFFFFFL, "Vertex2");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)3L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)0x3FFFFFFFFFFFFFFFL, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 0x3FFFFFFFFFFFFFFFL, "Vertex2");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex2", 1));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)2L, (long)scheduledTasks.size());
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(0)));
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(1)));
        Assert.assertEquals((long)4L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)4L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)0x7FFFFFFFFFFFFFFEL, (long)manager.completedSourceTasksOutputSize);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex1")).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex2")).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        scheduledTasks.clear();
    }

    @Test(timeout=5000L)
    public void testAutoParallelismConfig() throws Exception {
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null);
        ShuffleVertexManager.ShuffleVertexManagerConfigBuilder configurer = ShuffleVertexManager.createConfigBuilder(null);
        VertexManagerPluginDescriptor pluginDesc = configurer.setAutoReduceParallelism(true).setDesiredTaskInputSize(1000L).setMinTaskParallelism(10).setSlowStartMaxSrcCompletionFraction(0.5f).build();
        Mockito.when((Object)mockContext.getUserPayload()).thenReturn((Object)pluginDesc.getUserPayload());
        ShuffleVertexManager manager = (ShuffleVertexManager)ReflectionUtils.createClazzInstance((String)pluginDesc.getClassName(), (Class[])new Class[]{VertexManagerPluginContext.class}, (Object[])new Object[]{mockContext});
        manager.initialize();
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((boolean)manager.config.isAutoParallelismEnabled());
        Assert.assertTrue((manager.config.getDesiredTaskInputDataSize() == 1000L ? 1 : 0) != 0);
        Assert.assertTrue((manager.mgrConfig.getMinTaskParallelism() == 10 ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMinFraction() == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMaxFraction() == 0.5f ? 1 : 0) != 0);
        configurer = ShuffleVertexManager.createConfigBuilder(null);
        pluginDesc = configurer.setAutoReduceParallelism(false).build();
        Mockito.when((Object)mockContext.getUserPayload()).thenReturn((Object)pluginDesc.getUserPayload());
        manager = (ShuffleVertexManager)ReflectionUtils.createClazzInstance((String)pluginDesc.getClassName(), (Class[])new Class[]{VertexManagerPluginContext.class}, (Object[])new Object[]{mockContext});
        manager.initialize();
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((!manager.config.isAutoParallelismEnabled() ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getDesiredTaskInputDataSize() == ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT ? 1 : 0) != 0);
        Assert.assertTrue((manager.mgrConfig.getMinTaskParallelism() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMinFraction() == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.config.getMaxFraction() == 0.75f ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void testSchedulingWithPartitionStats() throws IOException {
        Configuration conf = new Configuration();
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "R2";
        mockInputVertices.put(r1, eProp1);
        mockInputVertices.put(m2, eProp2);
        mockInputVertices.put(m3, eProp3);
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(scheduledTasks)).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        ShuffleVertexManager manager = TestShuffleVertexManager.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 1 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        long[] sizes = new long[]{100000000L, 0L, 5000000000L};
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(sizes, 1060000000L, r1);
        manager.onVertexManagerEventReceived(vmEvent);
        sizes = new long[]{0L, 0L, 0L};
        vmEvent = this.getVertexManagerEvent(sizes, 1060000000L, r1);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue(((Integer)scheduledTasks.get(0) == 2 ? 1 : 0) != 0);
        Assert.assertTrue(((Integer)scheduledTasks.get(1) == 0 ? 1 : 0) != 0);
        Assert.assertTrue(((Integer)scheduledTasks.get(2) == 1 ? 1 : 0) != 0);
    }

    private static ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) {
        return TestShuffleVertexManager.createManager(conf, context, true, 1000L, min, max);
    }

    private static ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, Boolean enableAutoParallelism, Long desiredTaskInputSize, Float min, Float max) {
        return (ShuffleVertexManager)TestShuffleVertexManagerBase.createManager(ShuffleVertexManager.class, conf, context, enableAutoParallelism, desiredTaskInputSize, min, max);
    }
}

