package org.apache.tez.runtime.library.cartesianproduct;

import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductUserPayload;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:org/apache/tez/runtime/library/cartesianproduct/TestFairCartesianProductVertexManager.class */
public class TestFairCartesianProductVertexManager {

    @Captor
    private ArgumentCaptor<Map<String, EdgeProperty>> edgePropertiesCaptor;

    @Captor
    private ArgumentCaptor<List<VertexManagerPluginContext.ScheduleTaskRequest>> scheduleRequestCaptor;
    private FairCartesianProductVertexManager vertexManager;
    private VertexManagerPluginContext ctx;

    @Before
    public void setup() {
        MockitoAnnotations.openMocks(this);
        this.ctx = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        this.vertexManager = new FairCartesianProductVertexManager(this.ctx);
    }

    private void setupDAGVertexOnly(int i, long j, int i2, int i3) throws Exception {
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
        setSrcParallelism(this.ctx, i3, 2, 3);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(i).setMinOpsPerWorker(j).setNumPartitionsForFairCase(i2);
        this.vertexManager.initialize(newBuilder.build());
    }

    private void setupDAGVertexOnlyWithBroadcast(int i, long j, int i2) throws Exception {
        Map<String, EdgeProperty> edgePropertyMap = getEdgePropertyMap(2);
        edgePropertyMap.put("v2", EdgeProperty.create(EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType) null, (EdgeProperty.SchedulingType) null, (OutputDescriptor) null, (InputDescriptor) null));
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(edgePropertyMap);
        setSrcParallelism(this.ctx, i2, 2, 3, 5);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(i).setMinOpsPerWorker(j).setNumPartitionsForFairCase(i);
        this.vertexManager.initialize(newBuilder.build());
    }

    private void setupDAGVertexGroup(int i, long j, int i2) throws Exception {
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3));
        setSrcParallelism(this.ctx, i2, 2, 3, 4);
        HashMap hashMap = new HashMap();
        hashMap.put("g0", Arrays.asList("v1", "v2"));
        Mockito.when(this.ctx.getInputVertexGroups()).thenReturn(hashMap);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("v0").addSources("g0").setNumPartitionsForFairCase(i).setMaxParallelism(i).setMinOpsPerWorker(j);
        this.vertexManager.initialize(newBuilder.build());
    }

    private void setupDAGVertexGroupOnly(int i, long j, int i2) throws Exception {
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(4));
        setSrcParallelism(this.ctx, i2, 2, 3, 4, 5);
        HashMap hashMap = new HashMap();
        hashMap.put("g0", Arrays.asList("v0", "v1"));
        hashMap.put("g1", Arrays.asList("v2", "v3"));
        Mockito.when(this.ctx.getInputVertexGroups()).thenReturn(hashMap);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("g0").addSources("g1").setNumPartitionsForFairCase(i).setMaxParallelism(i).setMinOpsPerWorker(j);
        this.vertexManager.initialize(newBuilder.build());
    }

    private Map<String, EdgeProperty> getEdgePropertyMap(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put("v" + i2, EdgeProperty.create(EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName()), (EdgeProperty.DataSourceType) null, (EdgeProperty.SchedulingType) null, (OutputDescriptor) null, (InputDescriptor) null));
        }
        return hashMap;
    }

    private void setSrcParallelism(VertexManagerPluginContext vertexManagerPluginContext, int i, int... iArr) {
        int i2 = 0;
        for (int i3 : iArr) {
            Mockito.when(Integer.valueOf(vertexManagerPluginContext.getVertexNumTasks((String) Mockito.eq("v" + i2)))).thenReturn(Integer.valueOf(i3 * i));
            i2++;
        }
    }

    private TaskAttemptIdentifier getTaId(String str, int i) {
        return new TaskAttemptIdentifierImpl("dag", str, TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("0", 0, 0), 0), i), 0));
    }

    private VertexManagerEvent getVMEvent(long j, String str, int i) {
        ShuffleUserPayloads.VertexManagerEventPayloadProto.Builder newBuilder = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder();
        newBuilder.setNumRecord(j);
        VertexManagerEvent create = VertexManagerEvent.create("cp vertex", newBuilder.build().toByteString().asReadOnlyByteBuffer());
        create.setProducerAttemptIdentifier(getTaId(str, i));
        return create;
    }

    private void verifyEdgeProperties(EdgeProperty edgeProperty, String[] strArr, int[] iArr, int i) throws InvalidProtocolBufferException {
        CartesianProductUserPayload.CartesianProductConfigProto parseFrom = CartesianProductUserPayload.CartesianProductConfigProto.parseFrom(ByteString.copyFrom(edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload()));
        Assert.assertArrayEquals(strArr, parseFrom.getSourcesList().toArray());
        Assert.assertArrayEquals(iArr, Ints.toArray(parseFrom.getNumChunksList()));
        Assert.assertEquals(i, parseFrom.getMaxParallelism());
    }

    private void verifyVertexGroupInfo(EdgeProperty edgeProperty, int i, int... iArr) throws InvalidProtocolBufferException {
        CartesianProductUserPayload.CartesianProductConfigProto parseFrom = CartesianProductUserPayload.CartesianProductConfigProto.parseFrom(ByteString.copyFrom(edgeProperty.getEdgeManagerDescriptor().getUserPayload().getPayload()));
        Assert.assertEquals(i, parseFrom.getPositionInGroup());
        int i2 = 0;
        for (int i3 : iArr) {
            Assert.assertEquals(i3, parseFrom.getNumTaskPerVertexInGroup(i2));
            i2++;
        }
    }

    private void verifyScheduleRequest(int i, int... iArr) {
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(i))).scheduleTasks((List) this.scheduleRequestCaptor.capture());
        if (i > 0) {
            List list = (List) this.scheduleRequestCaptor.getValue();
            int i2 = 0;
            for (int i3 : iArr) {
                Assert.assertEquals(i3, ((VertexManagerPluginContext.ScheduleTaskRequest) list.get(i2)).getTaskIndex());
                i2++;
            }
        }
    }

    @Test(timeout = 5000)
    public void testDAGVertexOnlyGroupByMaxParallelism() throws Exception {
        setupDAGVertexOnly(30, 1L, 30, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(250L, "v0", 0));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.never())).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(), Mockito.anyMap());
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(200L, "v1", 0));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(30), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
        Map map = (Map) this.edgePropertiesCaptor.getValue();
        verifyEdgeProperties((EdgeProperty) map.get("v0"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30);
        verifyVertexGroupInfo((EdgeProperty) map.get("v0"), 0, new int[0]);
        verifyEdgeProperties((EdgeProperty) map.get("v1"), new String[]{"v0", "v1"}, new int[]{5, 6}, 30);
        verifyVertexGroupInfo((EdgeProperty) map.get("v1"), 0, new int[0]);
        this.vertexManager.onVertexStarted((List) null);
        verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
        verifyScheduleRequest(1, 0, 6, 1, 7);
        this.vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
        verifyScheduleRequest(2, 12, 13, 18, 19, 24, 25);
    }

    @Test(timeout = 5000)
    public void testDAGVertexOnlyGroupByMinOpsPerWorker() throws Exception {
        setupDAGVertexOnly(100, 10000L, 10, 10);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        for (int i = 0; i < 20; i++) {
            this.vertexManager.onVertexManagerEventReceived(getVMEvent(20L, "v0", i));
        }
        for (int i2 = 0; i2 < 30; i2++) {
            this.vertexManager.onVertexManagerEventReceived(getVMEvent(10L, "v1", i2));
        }
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(12), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
        Map map = (Map) this.edgePropertiesCaptor.getValue();
        verifyEdgeProperties((EdgeProperty) map.get("v0"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100);
        verifyEdgeProperties((EdgeProperty) map.get("v1"), new String[]{"v0", "v1"}, new int[]{4, 3}, 100);
        this.vertexManager.onVertexStarted((List) null);
        verifyScheduleRequest(0, new int[0]);
        for (int i3 = 0; i3 < 5; i3++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v0", i3));
        }
        for (int i4 = 0; i4 < 10; i4++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v1", 10 + i4));
        }
        verifyScheduleRequest(1, 1);
    }

    @Test(timeout = 5000)
    public void testDAGVertexGroup() throws Exception {
        setupDAGVertexGroup(100, 1L, 1);
        for (int i = 0; i < 3; i++) {
            this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
        }
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(100L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(10L, "v1", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(5L, "v2", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(5L, "v2", 1));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(100), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
        Map map = (Map) this.edgePropertiesCaptor.getValue();
        for (int i2 = 0; i2 < 3; i2++) {
            verifyEdgeProperties((EdgeProperty) map.get("v" + i2), new String[]{"v0", "g0"}, new int[]{20, 5}, 100);
        }
        this.vertexManager.onVertexStarted((List) null);
        this.vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
        verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onSourceTaskCompleted(getTaId("v2", 0));
        verifyScheduleRequest(1, 0, 5, 10, 15, 20, 25, 30, 35, 40, 45);
        this.vertexManager.onSourceTaskCompleted(getTaId("v1", 1));
        verifyScheduleRequest(1, new int[0]);
        this.vertexManager.onSourceTaskCompleted(getTaId("v2", 1));
        verifyScheduleRequest(2, 1, 6, 11, 16, 21, 26, 31, 36, 41, 46);
    }

    @Test(timeout = 5000)
    public void testDAGVertexGroupOnly() throws Exception {
        setupDAGVertexGroupOnly(100, 1L, 1);
        for (int i = 0; i < 4; i++) {
            this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v" + i, VertexState.CONFIGURED));
        }
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(20L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(20L, "v1", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(5L, "v2", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(5L, "v2", 1));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(16L, "v3", 0));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(100), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
        Map map = (Map) this.edgePropertiesCaptor.getValue();
        for (int i2 = 0; i2 < 4; i2++) {
            verifyEdgeProperties((EdgeProperty) map.get("v" + i2), new String[]{"g0", "g1"}, new int[]{10, 10}, 100);
        }
        verifyVertexGroupInfo((EdgeProperty) map.get("v0"), 0, new int[0]);
        verifyVertexGroupInfo((EdgeProperty) map.get("v1"), 1, 2);
        verifyVertexGroupInfo((EdgeProperty) map.get("v2"), 0, new int[0]);
        verifyVertexGroupInfo((EdgeProperty) map.get("v3"), 1, 4);
        this.vertexManager.onVertexStarted((List) null);
        this.vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
        this.vertexManager.onSourceTaskCompleted(getTaId("v2", 1));
        verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onSourceTaskCompleted(getTaId("v3", 1));
        verifyScheduleRequest(1, 3, 13, 23);
    }

    @Test(timeout = 5000)
    public void testSchedulingVertexOnlyWithBroadcast() throws Exception {
        setupDAGVertexOnlyWithBroadcast(30, 1L, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(250L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(200L, "v1", 0));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(30), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
        Assert.assertFalse(((Map) this.edgePropertiesCaptor.getValue()).containsKey("v2"));
        this.vertexManager.onVertexStarted((List) null);
        this.vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(getTaId("v1", 0));
        verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.RUNNING));
        verifyScheduleRequest(1, 0, 1, 6, 7);
    }

    @Test(timeout = 5000)
    public void testOnVertexStart() throws Exception {
        setupDAGVertexOnly(6, 1L, 6, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(100L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(100L, "v1", 0));
        verifyScheduleRequest(0, new int[0]);
        this.vertexManager.onVertexStarted(Arrays.asList(getTaId("v0", 0), getTaId("v1", 0)));
        verifyScheduleRequest(1, 0);
    }

    @Test(timeout = 5000)
    public void testZeroSrcTask() throws Exception {
        this.ctx = (VertexManagerPluginContext) Mockito.mock(VertexManagerPluginContext.class);
        this.vertexManager = new FairCartesianProductVertexManager(this.ctx);
        Mockito.when(Integer.valueOf(this.ctx.getVertexNumTasks("v0"))).thenReturn(2);
        Mockito.when(Integer.valueOf(this.ctx.getVertexNumTasks("v1"))).thenReturn(0);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("v0").addSources("v1").addNumChunks(2).addNumChunks(3).setMaxParallelism(6);
        CartesianProductUserPayload.CartesianProductConfigProto build = newBuilder.build();
        HashMap hashMap = new HashMap();
        hashMap.put("v0", EdgeProperty.create(EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName()), (EdgeProperty.DataSourceType) null, (EdgeProperty.SchedulingType) null, (OutputDescriptor) null, (InputDescriptor) null));
        hashMap.put("v1", EdgeProperty.create(EdgeManagerPluginDescriptor.create(CartesianProductEdgeManager.class.getName()), (EdgeProperty.DataSourceType) null, (EdgeProperty.SchedulingType) null, (OutputDescriptor) null, (InputDescriptor) null));
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(hashMap);
        this.vertexManager.initialize(build);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexStarted(new ArrayList());
        this.vertexManager.onSourceTaskCompleted(getTaId("v0", 0));
        this.vertexManager.onSourceTaskCompleted(getTaId("v0", 1));
    }

    private void setupGroupingFractionTest() throws Exception {
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
        setSrcParallelism(this.ctx, 10, 2, 3);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(30).setMinOpsPerWorker(1L).setNumPartitionsForFairCase(30).setGroupingFraction(0.5f);
        this.vertexManager.initialize(newBuilder.build());
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
    }

    @Test(timeout = 5000)
    public void testGroupingFraction() throws Exception {
        setupGroupingFractionTest();
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(10000L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(10000L, "v1", 0));
        for (int i = 0; i < 10; i++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v0", i));
        }
        for (int i2 = 0; i2 < 14; i2++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v1", i2));
        }
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.never())).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(), Mockito.anyMap());
        this.vertexManager.onSourceTaskCompleted(getTaId("v1", 14));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(24), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
    }

    @Test(timeout = 5000)
    public void testGroupFractionWithZeroStats() throws Exception {
        setupGroupingFractionTest();
        for (int i = 0; i < 10; i++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v0", i));
        }
        for (int i2 = 0; i2 < 15; i2++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v1", i2));
        }
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.never())).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint) Mockito.any(), Mockito.anyMap());
    }

    @Test(timeout = 5000)
    public void testGroupingFractionWithZeroOutput() throws Exception {
        setupGroupingFractionTest();
        for (int i = 0; i < 20; i++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v0", i));
        }
        for (int i2 = 0; i2 < 30; i2++) {
            this.vertexManager.onSourceTaskCompleted(getTaId("v1", i2));
        }
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(0), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
    }

    @Test(timeout = 5000)
    public void testZeroSrcOutput() throws Exception {
        setupDAGVertexOnly(10, 1L, 10, 1);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(0L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(0L, "v0", 1));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(0L, "v1", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(0L, "v1", 1));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(0L, "v1", 2));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(0), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
    }

    @Test(timeout = 5000)
    public void testDisableGrouping() throws Exception {
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(2));
        setSrcParallelism(this.ctx, 1, 2, 3);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("v0").addSources("v1").setMaxParallelism(30).setMinOpsPerWorker(1L).setEnableGrouping(false);
        this.vertexManager.initialize(newBuilder.build());
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(250L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(200L, "v1", 0));
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(6), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
    }

    @Test(timeout = 5000)
    public void testParallelismTwoSkewedSource() throws Exception {
        setupDAGVertexOnly(100, 10000L, 10, 10);
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(15000L, "v0", 0));
        for (int i = 0; i < 30; i++) {
            this.vertexManager.onVertexManagerEventReceived(getVMEvent(1L, "v1", i));
        }
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(99), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
        verifyEdgeProperties((EdgeProperty) ((Map) this.edgePropertiesCaptor.getValue()).get("v0"), new String[]{"v0", "v1"}, new int[]{99, 1}, 100);
    }

    @Test(timeout = 5000)
    public void testParallelismThreeSkewedSource() throws Exception {
        Mockito.when(this.ctx.getInputVertexEdgeProperties()).thenReturn(getEdgePropertyMap(3));
        setSrcParallelism(this.ctx, 10, 2, 3, 4);
        CartesianProductUserPayload.CartesianProductConfigProto.Builder newBuilder = CartesianProductUserPayload.CartesianProductConfigProto.newBuilder();
        newBuilder.setIsPartitioned(false).addSources("v0").addSources("v1").addSources("v2").setMaxParallelism(100).setMinOpsPerWorker(10000L).setNumPartitionsForFairCase(10);
        this.vertexManager.initialize(newBuilder.build());
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v0", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v1", VertexState.CONFIGURED));
        this.vertexManager.onVertexStateUpdated(new VertexStateUpdate("v2", VertexState.CONFIGURED));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(60000L, "v0", 0));
        this.vertexManager.onVertexManagerEventReceived(getVMEvent(4000L, "v1", 0));
        for (int i = 0; i < 40; i++) {
            this.vertexManager.onVertexManagerEventReceived(getVMEvent(3L, "v2", i));
        }
        ((VertexManagerPluginContext) Mockito.verify(this.ctx, Mockito.times(1))).reconfigureVertex(Mockito.eq(93), (VertexLocationHint) Mockito.any(), (Map) this.edgePropertiesCaptor.capture());
        verifyEdgeProperties((EdgeProperty) ((Map) this.edgePropertiesCaptor.getValue()).get("v0"), new String[]{"v0", "v1", "v2"}, new int[]{31, 3, 1}, 100);
    }
}
