/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.scheduler.strategy.DefaultInputConsumableDecider;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DefaultInputConsumableDeciderTest {
    DefaultInputConsumableDeciderTest() {
    }

    @Test
    void testNotFinishedBlockingInput() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = topology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices().withParallelism(2).finish();
        topology.connectAllToAll(producers, consumer).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        DefaultInputConsumableDecider inputConsumableDecider = this.createDefaultInputConsumableDecider(Collections.emptySet(), topology);
        consumer.forEach(vertex -> vertex.getConsumedPartitionGroups().forEach(group -> Assertions.assertThat((boolean)inputConsumableDecider.isConsumableBasedOnFinishedProducers(group)).isFalse()));
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(0), Collections.emptySet(), new HashMap())).isFalse();
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(1), Collections.emptySet(), new HashMap())).isFalse();
    }

    @Test
    void testAllFinishedBlockingInput() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = topology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices().withParallelism(2).finish();
        topology.connectAllToAll(producers, consumer).withResultPartitionState(ResultPartitionState.ALL_DATA_PRODUCED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        DefaultInputConsumableDecider inputConsumableDecider = this.createDefaultInputConsumableDecider(Collections.emptySet(), topology);
        consumer.forEach(vertex -> vertex.getConsumedPartitionGroups().forEach(group -> Assertions.assertThat((boolean)inputConsumableDecider.isConsumableBasedOnFinishedProducers(group)).isTrue()));
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(0), Collections.emptySet(), new HashMap())).isTrue();
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(1), Collections.emptySet(), new HashMap())).isTrue();
    }

    @Test
    void testUpstreamNotScheduledHybridInput() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = topology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices().withParallelism(2).finish();
        topology.connectAllToAll(producers, consumer).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        DefaultInputConsumableDecider inputConsumableDecider = this.createDefaultInputConsumableDecider(Collections.emptySet(), topology);
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(0), Collections.emptySet(), new HashMap())).isFalse();
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(1), Collections.emptySet(), new HashMap())).isFalse();
    }

    @Test
    void testUpstreamAllScheduledHybridInput() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers = topology.addExecutionVertices().withParallelism(2).finish();
        List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices().withParallelism(2).finish();
        topology.connectAllToAll(producers, consumer).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        HashSet<ExecutionVertexID> scheduledVertices = new HashSet<ExecutionVertexID>();
        DefaultInputConsumableDecider inputConsumableDecider = this.createDefaultInputConsumableDecider(scheduledVertices, topology);
        scheduledVertices.add(producers.get(0).getId());
        HashSet<ExecutionVertexID> vertexToDeploy = new HashSet<ExecutionVertexID>();
        vertexToDeploy.add(producers.get(1).getId());
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(0), vertexToDeploy, new HashMap())).isTrue();
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(1), vertexToDeploy, new HashMap())).isTrue();
    }

    @Test
    void testHybridAndBlockingInputButBlockingInputNotFinished() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producers1 = topology.addExecutionVertices().withParallelism(1).finish();
        List<TestingSchedulingExecutionVertex> producers2 = topology.addExecutionVertices().withParallelism(1).finish();
        List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices().withParallelism(1).finish();
        topology.connectAllToAll(producers1, consumer).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        topology.connectAllToAll(producers2, consumer).withResultPartitionState(ResultPartitionState.CREATED).withResultPartitionType(ResultPartitionType.HYBRID_FULL).finish();
        DefaultInputConsumableDecider inputConsumableDecider = this.createDefaultInputConsumableDecider(Collections.singleton(producers2.get(0).getId()), topology);
        Assertions.assertThat((boolean)inputConsumableDecider.isInputConsumable((SchedulingExecutionVertex)consumer.get(0), Collections.emptySet(), new HashMap())).isFalse();
    }

    private DefaultInputConsumableDecider createDefaultInputConsumableDecider(Set<ExecutionVertexID> scheduledVertices, SchedulingTopology schedulingTopology) {
        return new DefaultInputConsumableDecider(scheduledVertices::contains, arg_0 -> ((SchedulingTopology)schedulingTopology).getResultPartition(arg_0), id -> schedulingTopology.getVertex(id).getState());
    }
}

