/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.BatchExecutionOptionsInternal;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDecider;
import org.apache.flink.runtime.scheduler.adaptivebatch.PointwiseBlockingResultInfo;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class DefaultVertexParallelismAndInputInfosDeciderTest {
    private static final long BYTE_256_MB = 0x10000000L;
    private static final long BYTE_512_MB = 0x20000000L;
    private static final long BYTE_1_GB = 0x40000000L;
    private static final long BYTE_8_GB = 0x200000000L;
    private static final long BYTE_1_TB = 0x10000000000L;
    private static final int MAX_PARALLELISM = 100;
    private static final int MIN_PARALLELISM = 3;
    private static final int VERTEX_MAX_PARALLELISM = 256;
    private static final int DEFAULT_SOURCE_PARALLELISM = 10;
    private static final long DATA_VOLUME_PER_TASK = 0x40000000L;

    DefaultVertexParallelismAndInputInfosDeciderTest() {
    }

    @Test
    void testDecideParallelism() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x10000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x210000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(9);
    }

    @Test
    void testInitiallyNormalizedParallelismIsLargerThanMaxParallelism() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x10000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x10200000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(100);
    }

    @Test
    void testInitiallyNormalizedParallelismIsSmallerThanMinParallelism() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x10000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x20000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(3);
    }

    @Test
    void testNonBroadcastBytesCanNotDividedEvenly() {
        BlockingResultInfo resultInfo1 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromBroadcastResult(0x20000000L);
        BlockingResultInfo resultInfo2 = DefaultVertexParallelismAndInputInfosDeciderTest.createFromNonBroadcastResult(0x210000000L);
        int parallelism = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelism).isEqualTo(9);
    }

    @Test
    void testAllEdgesAllToAll() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        AllToAllBlockingResultInfo resultInfo2 = this.createAllToAllBlockingResultInfo(new long[]{8L, 12L, 21L, 9L, 13L, 7L, 19L, 13L, 14L, 5L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(5);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        List<IndexRange> subpartitionRanges = Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 3), new IndexRange(4, 6), new IndexRange(7, 8), new IndexRange(9, 9));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), subpartitionRanges);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), subpartitionRanges);
    }

    @Test
    void testAllEdgesAllToAllAndDecidedParallelismIsMaxParallelism() {
        AllToAllBlockingResultInfo resultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 2, 10L, Collections.singletonList(resultInfo));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(2);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 5), new IndexRange(6, 9)));
    }

    @Test
    void testAllEdgesAllToAllAndDecidedParallelismIsMinParallelism() {
        AllToAllBlockingResultInfo resultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(4, 10, 1000L, Collections.singletonList(resultInfo));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(4);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 5), new IndexRange(6, 7), new IndexRange(8, 9)));
    }

    @Test
    void testFallBackToEvenlyDistributeSubpartitions() {
        AllToAllBlockingResultInfo resultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 1L, 10L, 1L, 10L, 1L, 10L, 1L, 10L, 1L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(8, 8, 10L, Collections.singletonList(resultInfo));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(8);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 0), new IndexRange(1, 1), new IndexRange(2, 2), new IndexRange(3, 4), new IndexRange(5, 5), new IndexRange(6, 6), new IndexRange(7, 7), new IndexRange(8, 9)));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testAllEdgesAllToAllAndOneIsBroadcast(boolean singleSubpartitionContainsAllData) {
        long[] lArray;
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L}, false, false);
        if (singleSubpartitionContainsAllData) {
            long[] lArray2 = new long[1];
            lArray = lArray2;
            lArray2[0] = 10L;
        } else {
            long[] lArray3 = new long[10];
            lArray3[0] = 1L;
            lArray3[1] = 1L;
            lArray3[2] = 1L;
            lArray3[3] = 1L;
            lArray3[4] = 1L;
            lArray3[5] = 1L;
            lArray3[6] = 1L;
            lArray3[7] = 1L;
            lArray3[8] = 1L;
            lArray = lArray3;
            lArray3[9] = 1L;
        }
        AllToAllBlockingResultInfo resultInfo2 = this.createAllToAllBlockingResultInfo(lArray, true, singleSubpartitionContainsAllData);
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(3);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), Arrays.asList(new IndexRange(0, 4), new IndexRange(5, 8), new IndexRange(9, 9)));
        if (singleSubpartitionContainsAllData) {
            DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), Arrays.asList(new IndexRange(0, 0), new IndexRange(0, 0), new IndexRange(0, 0)));
        } else {
            DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), Arrays.asList(new IndexRange(0, 9), new IndexRange(0, 9), new IndexRange(0, 9)));
        }
    }

    @Test
    void testAllEdgesBroadcast() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L}, true, false);
        AllToAllBlockingResultInfo resultInfo2 = this.createAllToAllBlockingResultInfo(new long[]{10L}, true, false);
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isOne();
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        List<IndexRange> expectedSubpartitionRanges = Collections.singletonList(new IndexRange(0, 0));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), expectedSubpartitionRanges);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), expectedSubpartitionRanges);
    }

    @Test
    void testHavePointwiseEdges() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        PointwiseBlockingResultInfo resultInfo2 = this.createPointwiseBlockingResultInfo({8L, 12L, 21L, 9L, 13L}, {7L, 19L, 13L, 14L, 5L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(4);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 5), new IndexRange(6, 7), new IndexRange(8, 9)));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), Arrays.asList(Map.of(new IndexRange(0, 0), new IndexRange(0, 1)), Map.of(new IndexRange(0, 0), new IndexRange(2, 3)), Map.of(new IndexRange(0, 0), new IndexRange(4, 4), new IndexRange(1, 1), new IndexRange(0, 1)), Map.of(new IndexRange(1, 1), new IndexRange(2, 4))));
    }

    @Test
    void testHavePointwiseAndBroadcastEdge() {
        AllToAllBlockingResultInfo resultInfo1 = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L}, true, false);
        PointwiseBlockingResultInfo resultInfo2 = this.createPointwiseBlockingResultInfo({8L, 12L, 21L, 9L, 13L}, {7L, 19L, 13L, 14L, 5L});
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(1, 10, 60L, Arrays.asList(resultInfo1, resultInfo2));
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(6);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(2);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo1.getResultId()), Arrays.asList(new IndexRange(0, 9), new IndexRange(0, 9), new IndexRange(0, 9), new IndexRange(0, 9), new IndexRange(0, 9), new IndexRange(0, 9)));
        DefaultVertexParallelismAndInputInfosDeciderTest.checkJobVertexInputInfo((JobVertexInputInfo)parallelismAndInputInfos.getJobVertexInputInfos().get(resultInfo2.getResultId()), Arrays.asList(Map.of(new IndexRange(0, 0), new IndexRange(0, 1)), Map.of(new IndexRange(0, 0), new IndexRange(2, 3)), Map.of(new IndexRange(0, 0), new IndexRange(4, 4), new IndexRange(1, 1), new IndexRange(0, 0)), Map.of(new IndexRange(1, 1), new IndexRange(1, 1)), Map.of(new IndexRange(1, 1), new IndexRange(2, 3)), Map.of(new IndexRange(1, 1), new IndexRange(4, 4))));
    }

    @Test
    void testSourceJobVertex() {
        ParallelismAndInputInfos parallelismAndInputInfos = DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelismAndInputInfos(3, 100, 0x40000000L, Collections.emptyList());
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(10);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).isEmpty();
    }

    @Test
    void testDynamicSourceParallelismWithUpstreamInputs() {
        DefaultVertexParallelismAndInputInfosDecider decider = DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(3, 100, 0x40000000L);
        AllToAllBlockingResultInfo allToAllBlockingResultInfo = this.createAllToAllBlockingResultInfo(new long[]{10L, 15L, 13L, 12L, 1L, 10L, 8L, 20L, 12L, 17L});
        int dynamicSourceParallelism = 4;
        ParallelismAndInputInfos parallelismAndInputInfos = decider.decideParallelismAndInputInfosForVertex(new JobVertexID(), Collections.singletonList(DefaultVertexParallelismAndInputInfosDeciderTest.toBlockingInputInfoView((BlockingResultInfo)allToAllBlockingResultInfo)), -1, dynamicSourceParallelism, 100);
        Assertions.assertThat((int)parallelismAndInputInfos.getParallelism()).isEqualTo(4);
        Assertions.assertThat((Map)parallelismAndInputInfos.getJobVertexInputInfos()).hasSize(1);
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo((JobVertexInputInfo)Iterables.getOnlyElement(parallelismAndInputInfos.getJobVertexInputInfos().values()), Arrays.asList(new IndexRange(0, 1), new IndexRange(2, 5), new IndexRange(6, 7), new IndexRange(8, 9)));
    }

    @Test
    void testComputeSourceParallelismUpperBound() {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, (Object)10);
        DefaultVertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = DefaultVertexParallelismAndInputInfosDeciderTest.createDefaultVertexParallelismAndInputInfosDecider(100, configuration);
        Assertions.assertThat((int)vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(new JobVertexID(), 256)).isEqualTo(10);
    }

    @Test
    void testComputeSourceParallelismUpperBoundFallback() {
        Configuration configuration = new Configuration();
        DefaultVertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = DefaultVertexParallelismAndInputInfosDeciderTest.createDefaultVertexParallelismAndInputInfosDecider(100, configuration);
        Assertions.assertThat((int)vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(new JobVertexID(), 256)).isEqualTo(100);
    }

    @Test
    void testComputeSourceParallelismUpperBoundNotExceedMaxParallelism() {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, (Object)512);
        DefaultVertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = DefaultVertexParallelismAndInputInfosDeciderTest.createDefaultVertexParallelismAndInputInfosDecider(100, configuration);
        Assertions.assertThat((int)vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(new JobVertexID(), 256)).isEqualTo(256);
    }

    private static void checkAllToAllJobVertexInputInfo(JobVertexInputInfo jobVertexInputInfo, List<IndexRange> subpartitionRanges) {
        DefaultVertexParallelismAndInputInfosDeciderTest.checkAllToAllJobVertexInputInfo(jobVertexInputInfo, new IndexRange(0, 0), subpartitionRanges);
    }

    private static void checkAllToAllJobVertexInputInfo(JobVertexInputInfo jobVertexInputInfo, IndexRange indexRange, List<IndexRange> subpartitionRanges) {
        ArrayList<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<ExecutionVertexInputInfo>();
        for (int i = 0; i < subpartitionRanges.size(); ++i) {
            executionVertexInputInfos.add(new ExecutionVertexInputInfo(i, indexRange, subpartitionRanges.get(i)));
        }
        Assertions.assertThat((List)jobVertexInputInfo.getExecutionVertexInputInfos()).containsExactlyInAnyOrderElementsOf(executionVertexInputInfos);
    }

    private static void checkJobVertexInputInfo(JobVertexInputInfo jobVertexInputInfo, List<Map<IndexRange, IndexRange>> consumedSubpartitionGroups) {
        ArrayList<ExecutionVertexInputInfo> executionVertexInputInfos = new ArrayList<ExecutionVertexInputInfo>();
        for (int i = 0; i < consumedSubpartitionGroups.size(); ++i) {
            executionVertexInputInfos.add(new ExecutionVertexInputInfo(i, consumedSubpartitionGroups.get(i)));
        }
        Assertions.assertThat((List)jobVertexInputInfo.getExecutionVertexInputInfos()).containsExactlyInAnyOrderElementsOf(executionVertexInputInfos);
    }

    static DefaultVertexParallelismAndInputInfosDecider createDecider(int minParallelism, int maxParallelism, long dataVolumePerTask) {
        return DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(minParallelism, maxParallelism, dataVolumePerTask, 10);
    }

    static DefaultVertexParallelismAndInputInfosDecider createDecider(int minParallelism, int maxParallelism, long dataVolumePerTask, int defaultSourceParallelism) {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, (Object)minParallelism);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, (Object)new MemorySize(dataVolumePerTask));
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, (Object)defaultSourceParallelism);
        return DefaultVertexParallelismAndInputInfosDeciderTest.createDefaultVertexParallelismAndInputInfosDecider(maxParallelism, configuration);
    }

    static DefaultVertexParallelismAndInputInfosDecider createDefaultVertexParallelismAndInputInfosDecider(int maxParallelism, Configuration configuration) {
        return DefaultVertexParallelismAndInputInfosDecider.from((int)maxParallelism, (double)((Double)BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_FACTOR.defaultValue()), (long)((MemorySize)BatchExecutionOptionsInternal.ADAPTIVE_SKEWED_OPTIMIZATION_SKEWED_THRESHOLD.defaultValue()).getBytes(), (Configuration)configuration);
    }

    private static int createDeciderAndDecideParallelism(List<BlockingResultInfo> consumedResults) {
        return DefaultVertexParallelismAndInputInfosDeciderTest.createDeciderAndDecideParallelism(3, 100, 0x40000000L, consumedResults);
    }

    private static int createDeciderAndDecideParallelism(int minParallelism, int maxParallelism, long dataVolumePerTask, List<BlockingResultInfo> consumedResults) {
        DefaultVertexParallelismAndInputInfosDecider decider = DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(minParallelism, maxParallelism, dataVolumePerTask);
        return decider.decideParallelism(new JobVertexID(), DefaultVertexParallelismAndInputInfosDeciderTest.toBlockingInputInfoViews(consumedResults), minParallelism, maxParallelism);
    }

    private static ParallelismAndInputInfos createDeciderAndDecideParallelismAndInputInfos(int minParallelism, int maxParallelism, long dataVolumePerTask, List<BlockingResultInfo> consumedResults) {
        DefaultVertexParallelismAndInputInfosDecider decider = DefaultVertexParallelismAndInputInfosDeciderTest.createDecider(minParallelism, maxParallelism, dataVolumePerTask);
        return decider.decideParallelismAndInputInfosForVertex(new JobVertexID(), DefaultVertexParallelismAndInputInfosDeciderTest.toBlockingInputInfoViews(consumedResults), -1, minParallelism, maxParallelism);
    }

    private AllToAllBlockingResultInfo createAllToAllBlockingResultInfo(long[] aggregatedSubpartitionBytes) {
        return this.createAllToAllBlockingResultInfo(aggregatedSubpartitionBytes, false, false);
    }

    private AllToAllBlockingResultInfo createAllToAllBlockingResultInfo(long[] aggregatedSubpartitionBytes, boolean isBroadcast, boolean isSingleSubpartitionContainsAllData) {
        AllToAllBlockingResultInfo resultInfo = new AllToAllBlockingResultInfo(new IntermediateDataSetID(), 1, aggregatedSubpartitionBytes.length, isBroadcast, isSingleSubpartitionContainsAllData);
        resultInfo.recordPartitionInfo(0, new ResultPartitionBytes(aggregatedSubpartitionBytes));
        return resultInfo;
    }

    private PointwiseBlockingResultInfo createPointwiseBlockingResultInfo(long[] ... subpartitionBytesByPartition) {
        Set subpartitionNumSet = Arrays.stream(subpartitionBytesByPartition).map(array -> ((long[])array).length).collect(Collectors.toSet());
        Preconditions.checkState((subpartitionNumSet.size() == 1 ? 1 : 0) != 0);
        int numSubpartitions = (Integer)subpartitionNumSet.iterator().next();
        int numPartitions = subpartitionBytesByPartition.length;
        PointwiseBlockingResultInfo resultInfo = new PointwiseBlockingResultInfo(new IntermediateDataSetID(), numPartitions, numSubpartitions);
        int partitionIndex = 0;
        for (long[] subpartitionBytes : subpartitionBytesByPartition) {
            resultInfo.recordPartitionInfo(partitionIndex++, new ResultPartitionBytes(subpartitionBytes));
        }
        return resultInfo;
    }

    private static BlockingResultInfo createFromBroadcastResult(long producedBytes) {
        return new TestingBlockingResultInfo(true, true, producedBytes);
    }

    private static BlockingResultInfo createFromNonBroadcastResult(long producedBytes) {
        return new TestingBlockingResultInfo(false, false, producedBytes);
    }

    public static BlockingInputInfo toBlockingInputInfoView(BlockingResultInfo blockingResultInfo) {
        boolean existIntraInputKeyCorrelation = blockingResultInfo instanceof AllToAllBlockingResultInfo;
        boolean existInterInputsKeyCorrelation = blockingResultInfo instanceof AllToAllBlockingResultInfo;
        return new BlockingInputInfo(blockingResultInfo, 0, existInterInputsKeyCorrelation, existIntraInputKeyCorrelation);
    }

    public static List<BlockingInputInfo> toBlockingInputInfoViews(List<BlockingResultInfo> blockingResultInfos) {
        ArrayList<BlockingInputInfo> blockingInputInfos = new ArrayList<BlockingInputInfo>();
        for (BlockingResultInfo blockingResultInfo : blockingResultInfos) {
            blockingInputInfos.add(DefaultVertexParallelismAndInputInfosDeciderTest.toBlockingInputInfoView(blockingResultInfo));
        }
        return blockingInputInfos;
    }

    private static class TestingBlockingResultInfo
    implements BlockingResultInfo {
        private final boolean isBroadcast;
        private final boolean singleSubpartitionContainsAllData;
        private final long producedBytes;
        private final int numPartitions;
        private final int numSubpartitions;

        private TestingBlockingResultInfo(boolean isBroadcast, boolean singleSubpartitionContainsAllData, long producedBytes) {
            this(isBroadcast, singleSubpartitionContainsAllData, producedBytes, 100, 100);
        }

        private TestingBlockingResultInfo(boolean isBroadcast, boolean singleSubpartitionContainsAllData, long producedBytes, int numPartitions, int numSubpartitions) {
            this.isBroadcast = isBroadcast;
            this.singleSubpartitionContainsAllData = singleSubpartitionContainsAllData;
            this.producedBytes = producedBytes;
            this.numPartitions = numPartitions;
            this.numSubpartitions = numSubpartitions;
        }

        public IntermediateDataSetID getResultId() {
            return new IntermediateDataSetID();
        }

        public boolean isBroadcast() {
            return this.isBroadcast;
        }

        public boolean isSingleSubpartitionContainsAllData() {
            return this.singleSubpartitionContainsAllData;
        }

        public boolean isPointwise() {
            return false;
        }

        public int getNumPartitions() {
            return this.numPartitions;
        }

        public int getNumSubpartitions(int partitionIndex) {
            return this.numSubpartitions;
        }

        public long getNumBytesProduced() {
            return this.producedBytes;
        }

        public long getNumBytesProduced(IndexRange partitionIndexRange, IndexRange subpartitionIndexRange) {
            throw new UnsupportedOperationException();
        }

        public void recordPartitionInfo(int partitionIndex, ResultPartitionBytes partitionBytes) {
        }

        public void resetPartitionInfo(int partitionIndex) {
        }

        public Map<Integer, long[]> getSubpartitionBytesByPartitionIndex() {
            return Map.of();
        }
    }
}

