/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.cartesianproduct;

import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload;
import org.apache.tez.runtime.library.cartesianproduct.FairCartesianProductVertexManager;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

public class TestFairCartesianProductVertexManager {
    @Captor
    private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;
    @Captor
    private ArgumentCaptor<List<VertexManagerPluginContext.ScheduleTaskRequest>> scheduleRequestCaptor;
    private FairCartesianProductVertexManager vertexManager;
    private VertexManagerPluginContext ctx;

    @Before
    public void setup() {
        MockitoAnnotations.openMocks((Object)this);
        this.ctx = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        this.vertexManager = new FairCartesianProductVertexManager(this.ctx);
    }

    private void setupDAGVertexOnly(int maxParallelism, long minOpsPerWorker, int numPartition, int srcParallelismMultiplier) throws Exception {
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(this.getEdgePropertyMap(2));
        this.setSrcParallelism(this.ctx, srcParallelismMultiplier, 2, 3);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(maxParallelism).setMinOpsPerWorker(minOpsPerWorker).setNumPartitionsForFairCase(numPartition);
        this.vertexManager.initialize(builder.build());
    }

    private void setupDAGVertexOnlyWithBroadcast(int maxParallelism, long minWorkloadPerWorker, int srcParallelismMultiplier) throws Exception {
        Map<String, EdgeProperty> edgePropertyMap = this.getEdgePropertyMap(2);
        edgePropertyMap.put("v2", EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, null, null, null, null));
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
        this.setSrcParallelism(this.ctx, srcParallelismMultiplier, 2, 3, 5);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(maxParallelism).setMinOpsPerWorker(minWorkloadPerWorker).setNumPartitionsForFairCase(maxParallelism);
        this.vertexManager.initialize(builder.build());
    }

    private void setupDAGVertexGroup(int maxParallelism, long minWorkloadPerWorker, int srcParallelismMultiplier) throws Exception {
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(this.getEdgePropertyMap(3));
        this.setSrcParallelism(this.ctx, srcParallelismMultiplier, 2, 3, 4);
        HashMap<String, List<String>> vertexGroupMap = new HashMap<String, List<String>>();
        vertexGroupMap.put("g0", Arrays.asList("v1", "v2"));
        Mockito.when((Object)this.ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("v0").addSources("g0").setNumPartitionsForFairCase(maxParallelism).setMaxParallelism(maxParallelism).setMinOpsPerWorker(minWorkloadPerWorker);
        this.vertexManager.initialize(builder.build());
    }

    private void setupDAGVertexGroupOnly(int maxParallelism, long minWorkloadPerWorker, int srcParallelismMultiplier) throws Exception {
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(this.getEdgePropertyMap(4));
        this.setSrcParallelism(this.ctx, srcParallelismMultiplier, 2, 3, 4, 5);
        HashMap<String, List<String>> vertexGroupMap = new HashMap<String, List<String>>();
        vertexGroupMap.put("g0", Arrays.asList("v0", "v1"));
        vertexGroupMap.put("g1", Arrays.asList("v2", "v3"));
        Mockito.when((Object)this.ctx.getInputVertexGroups()).thenReturn(vertexGroupMap);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("g0").addSources("g1").setNumPartitionsForFairCase(maxParallelism).setMaxParallelism(maxParallelism).setMinOpsPerWorker(minWorkloadPerWorker);
        this.vertexManager.initialize(builder.build());
    }

    private Map<String, EdgeProperty> getEdgePropertyMap(int numSrcV) {
        HashMap<String, EdgeProperty> edgePropertyMap = new HashMap<String, EdgeProperty>();
        for (int i = 0; i < numSrcV; ++i) {
            edgePropertyMap.put("v" + i, EdgeProperty.create((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CartesianProductEdgeManager.class.getName()), null, null, null, null));
        }
        return edgePropertyMap;
    }

    private void setSrcParallelism(VertexManagerPluginContext ctx, int multiplier, int ... numTasks) {
        int i = 0;
        for (int numTask : numTasks) {
            Mockito.when((Object)ctx.getVertexNumTasks((String)Mockito.eq((Object)("v" + i)))).thenReturn((Object)(numTask * multiplier));
            ++i;
        }
    }

    private TaskAttemptIdentifier getTaId(String vertexName, int taskId) {
        return new TaskAttemptIdentifierImpl("dag", vertexName, TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)TezVertexID.getInstance((TezDAGID)TezDAGID.getInstance((String)"0", (int)0, (int)0), (int)0), (int)taskId), (int)0));
    }

    private VertexManagerEvent getVMEvent(long numRecord, String vName, int taskId) {
        ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder builder = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
        builder.setNumRecord(numRecord);
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)"cp vertex", (ByteBuffer)builder.build().toByteString().asReadOnlyByteBuffer());
        vmEvent.setProducerAttemptIdentifier(this.getTaId(vName, taskId));
        return vmEvent;
    }

    private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] sources, int[] numChunksPerSrc, int maxParallelism) throws InvalidProtocolBufferException {
        CartesianProductUserPayload.CartesianProductConfigProto config = CartesianProductUserPayload.CartesianProductConfigProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload()));
        Assert.assertArrayEquals((Object[])sources, (Object[])config.getSourcesList().toArray());
        Assert.assertArrayEquals((int[])numChunksPerSrc, (int[])Ints.toArray((Collection)config.getNumChunksList()));
        Assert.assertEquals((long)maxParallelism, (long)config.getMaxParallelism());
    }

    private void verifyVertexGroupInfo(EdgeProperty edgeProperty, int positionInGroup, int ... numTaskPerVertexInGroup) throws InvalidProtocolBufferException {
        CartesianProductUserPayload.CartesianProductConfigProto config = CartesianProductUserPayload.CartesianProductConfigProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload()));
        Assert.assertEquals((long)positionInGroup, (long)config.getPositionInGroup());
        int i = 0;
        for (int numTask : numTaskPerVertexInGroup) {
            Assert.assertEquals((long)numTask, (long)config.getNumTaskPerVertexInGroup(i));
            ++i;
        }
    }

    private void verifyScheduleRequest(int expectedTimes, int ... expectedTid) {
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)expectedTimes))).scheduleTasks((List)this.scheduleRequestCaptor.capture());
        if (expectedTimes > 0) {
            List requests = (List)this.scheduleRequestCaptor.getValue();
            int i = 0;
            for (int tid : expectedTid) {
                Assert.assertEquals((long)tid, (long)((VertexManagerPluginContext.ScheduleTaskRequest)requests.get(i)).getTaskIndex());
                ++i;
            }
        }
    }

    @Test(timeout=5000L)
    public void testDAGVertexOnlyGroupByMaxParallelism() throws Exception {
        this.setupDAGVertexOnly(30, 1L, 30, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(250L, "v0", 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.never())).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(), Mockito.anyMap());
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(200L, "v1", 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)30), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
        Map edgeProperties = (Map)this.edgePropertiesCaptor.getValue();
        this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30);
        this.verifyVertexGroupInfo((EdgeProperty)edgeProperties.get("v0"), 0, new int[0]);
        this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30);
        this.verifyVertexGroupInfo((EdgeProperty)edgeProperties.get("v1"), 0, new int[0]);
        this.vertexManager.onVertexStarted(null);
        this.verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", 0));
        this.verifyScheduleRequest(1, 0, 6, 1, 7);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", 1));
        this.verifyScheduleRequest(2, 12, 13, 18, 19, 24, 25);
    }

    @Test(timeout=5000L)
    public void testDAGVertexOnlyGroupByMinOpsPerWorker() throws Exception {
        int i;
        int i2;
        this.setupDAGVertexOnly(100, 10000L, 10, 10);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        for (i2 = 0; i2 < 20; ++i2) {
            this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(20L, "v0", i2));
        }
        for (i2 = 0; i2 < 30; ++i2) {
            this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(10L, "v1", i2));
        }
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)12), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
        Map edgeProperties = (Map)this.edgePropertiesCaptor.getValue();
        this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100);
        this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v1"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100);
        this.vertexManager.onVertexStarted(null);
        this.verifyScheduleRequest(0, new int[0]);
        for (i = 0; i < 5; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", i));
        }
        for (i = 0; i < 10; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", 10 + i));
        }
        this.verifyScheduleRequest(1, 1);
    }

    @Test(timeout=5000L)
    public void testDAGVertexGroup() throws Exception {
        this.setupDAGVertexGroup(100, 1L, 1);
        for (int i = 0; i < 3; ++i) {
            this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
        }
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(100L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(10L, "v1", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(5L, "v2", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(5L, "v2", 1));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)100), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
        Map edgeProperties = (Map)this.edgePropertiesCaptor.getValue();
        for (int i = 0; i < 3; ++i) {
            this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v" + i), new String[]{"v0", "g0"}, new int[]{20, 5}, 100);
        }
        this.vertexManager.onVertexStarted(null);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", 0));
        this.verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v2", 0));
        this.verifyScheduleRequest(1, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", 1));
        this.verifyScheduleRequest(1, new int[0]);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v2", 1));
        this.verifyScheduleRequest(2, 1, 6, 11, 16, 21, 26, 31, 36, 41, 46);
    }

    @Test(timeout=5000L)
    public void testDAGVertexGroupOnly() throws Exception {
        this.setupDAGVertexGroupOnly(100, 1L, 1);
        for (int i = 0; i < 4; ++i) {
            this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
        }
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(20L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(20L, "v1", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(5L, "v2", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(5L, "v2", 1));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(16L, "v3", 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)100), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
        Map edgeProperties = (Map)this.edgePropertiesCaptor.getValue();
        for (int i = 0; i < 4; ++i) {
            this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v" + i), new String[]{"g0", "g1"}, new int[]{10, 10}, 100);
        }
        this.verifyVertexGroupInfo((EdgeProperty)edgeProperties.get("v0"), 0, new int[0]);
        this.verifyVertexGroupInfo((EdgeProperty)edgeProperties.get("v1"), 1, 2);
        this.verifyVertexGroupInfo((EdgeProperty)edgeProperties.get("v2"), 0, new int[0]);
        this.verifyVertexGroupInfo((EdgeProperty)edgeProperties.get("v3"), 1, 4);
        this.vertexManager.onVertexStarted(null);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", 0));
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v2", 1));
        this.verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v3", 1));
        this.verifyScheduleRequest(1, 3, 13, 23);
    }

    @Test(timeout=5000L)
    public void testSchedulingVertexOnlyWithBroadcast() throws Exception {
        this.setupDAGVertexOnlyWithBroadcast(30, 1L, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(250L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(200L, "v1", 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)30), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
        Assert.assertFalse((boolean)((Map)this.edgePropertiesCaptor.getValue()).containsKey("v2"));
        this.vertexManager.onVertexStarted(null);
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", 0));
        this.verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
        this.verifyScheduleRequest(1, 0, 1, 6, 7);
    }

    @Test(timeout=5000L)
    public void testOnVertexStart() throws Exception {
        this.setupDAGVertexOnly(6, 1L, 6, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(100L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(100L, "v1", 0));
        this.verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onVertexStarted(Arrays.asList(this.getTaId("v0", 0), this.getTaId("v1", 0)));
        this.verifyScheduleRequest(1, 0);
    }

    @Test(timeout=5000L)
    public void testZeroSrcTask() throws Exception {
        this.ctx = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        this.vertexManager = new FairCartesianProductVertexManager(this.ctx);
        Mockito.when((Object)this.ctx.getVertexNumTasks("v0")).thenReturn((Object)2);
        Mockito.when((Object)this.ctx.getVertexNumTasks("v1")).thenReturn((Object)0);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("v0").addSources("v1").addNumChunks(2).addNumChunks(3).setMaxParallelism(6);
        CartesianProductUserPayload.CartesianProductConfigProto config = builder.build();
        HashMap<String, EdgeProperty> edgePropertyMap = new HashMap<String, EdgeProperty>();
        edgePropertyMap.put("v0", EdgeProperty.create((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CartesianProductEdgeManager.class.getName()), null, null, null, null));
        edgePropertyMap.put("v1", EdgeProperty.create((EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CartesianProductEdgeManager.class.getName()), null, null, null, null));
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
        this.vertexManager.initialize(config);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexStarted(new ArrayList());
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", 1));
    }

    private void setupGroupingFractionTest() throws Exception {
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(this.getEdgePropertyMap(2));
        this.setSrcParallelism(this.ctx, 10, 2, 3);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(30).setMinOpsPerWorker(1L).setNumPartitionsForFairCase(30).setGroupingFraction(0.5f);
        this.vertexManager.initialize(builder.build());
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
    }

    @Test(timeout=5000L)
    public void testGroupingFraction() throws Exception {
        int i;
        this.setupGroupingFractionTest();
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(10000L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(10000L, "v1", 0));
        for (i = 0; i < 10; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", i));
        }
        for (i = 0; i < 14; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", i));
        }
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.never())).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(), Mockito.anyMap());
        this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", 14));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)24), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
    }

    @Test(timeout=5000L)
    public void testGroupFractionWithZeroStats() throws Exception {
        int i;
        this.setupGroupingFractionTest();
        for (i = 0; i < 10; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", i));
        }
        for (i = 0; i < 15; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", i));
        }
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.never())).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(), Mockito.anyMap());
    }

    @Test(timeout=5000L)
    public void testGroupingFractionWithZeroOutput() throws Exception {
        int i;
        this.setupGroupingFractionTest();
        for (i = 0; i < 20; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v0", i));
        }
        for (i = 0; i < 30; ++i) {
            this.vertexManager.onSourceTaskCompleted(this.getTaId("v1", i));
        }
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)0), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
    }

    @Test(timeout=5000L)
    public void testZeroSrcOutput() throws Exception {
        this.setupDAGVertexOnly(10, 1L, 10, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(0L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(0L, "v0", 1));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(0L, "v1", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(0L, "v1", 1));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(0L, "v1", 2));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)0), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
    }

    @Test(timeout=5000L)
    public void testDisableGrouping() throws Exception {
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(this.getEdgePropertyMap(2));
        this.setSrcParallelism(this.ctx, 1, 2, 3);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(30).setMinOpsPerWorker(1L).setEnableGrouping(false);
        this.vertexManager.initialize(builder.build());
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(250L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(200L, "v1", 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)6), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
    }

    @Test(timeout=5000L)
    public void testParallelismTwoSkewedSource() throws Exception {
        this.setupDAGVertexOnly(100, 10000L, 10, 10);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(15000L, "v0", 0));
        for (int i = 0; i < 30; ++i) {
            this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(1L, "v1", i));
        }
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)99), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
        Map edgeProperties = (Map)this.edgePropertiesCaptor.getValue();
        this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v0"), new String[]{"v0", "v1"}, new int[]{99, 1}, 100);
    }

    @Test(timeout=5000L)
    public void testParallelismThreeSkewedSource() throws Exception {
        Mockito.when((Object)this.ctx.getInputVertexEdgeProperties()).thenReturn(this.getEdgePropertyMap(3));
        this.setSrcParallelism(this.ctx, 10, 2, 3, 4);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder builder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        builder.setIsPartitioned(false).addSources("v0").addSources("v1").addSources("v2").setMaxParallelism(100).setMinOpsPerWorker(10000L).setNumPartitionsForFairCase(10);
        this.vertexManager.initialize(builder.build());
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(60000L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(4000L, "v1", 0));
        for (int i = 0; i < 40; ++i) {
            this.vertexManager.onVertexManagerEventReceived(this.getVMEvent(3L, "v2", i));
        }
        ((VertexManagerPluginContext)Mockito.verify((Object)this.ctx, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)93), (VertexLocationHint)Mockito.any(), (Map)this.edgePropertiesCaptor.capture());
        Map edgeProperties = (Map)this.edgePropertiesCaptor.getValue();
        this.verifyEdgeProperties((EdgeProperty)edgeProperties.get("v0"), new String[]{"v0", "v1", "v2"}, new int[]{31, 3, 1}, 100);
    }
}

