/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.TestingBatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecDynamicFilteringDataCollector;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.processor.utils.InputPriorityConflictResolver;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class InputPriorityConflictResolverTest {
    @Parameter
    public Tuple2<BatchShuffleMode, StreamExchangeMode> batchShuffleModeAndStreamExchangeMode;

    InputPriorityConflictResolverTest() {
    }

    @Parameters(name="batchShuffleModeAndStreamExchangeMode={0}")
    public static Collection<Tuple2<BatchShuffleMode, StreamExchangeMode>> parameters() {
        return Arrays.asList(Tuple2.of((Object)BatchShuffleMode.ALL_EXCHANGES_BLOCKING, (Object)StreamExchangeMode.BATCH), Tuple2.of((Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, (Object)StreamExchangeMode.BATCH), Tuple2.of((Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, (Object)StreamExchangeMode.BATCH), Tuple2.of((Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, (Object)StreamExchangeMode.HYBRID_FULL), Tuple2.of((Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, (Object)StreamExchangeMode.HYBRID_SELECTIVE));
    }

    @TestTemplate
    void testDetectAndResolve() {
        TestingBatchExecNode[] nodes = new TestingBatchExecNode[9];
        for (int i = 0; i < nodes.length; ++i) {
            nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        nodes[1].addInput((ExecNode<?>)nodes[0], InputProperty.builder().priority(0).build());
        nodes[2].addInput((ExecNode<?>)nodes[1], InputProperty.builder().priority(0).build());
        nodes[3].addInput((ExecNode<?>)nodes[0], InputProperty.builder().priority(0).build());
        nodes[4].addInput((ExecNode<?>)nodes[8], InputProperty.builder().priority(0).build());
        nodes[4].addInput((ExecNode<?>)nodes[0], InputProperty.builder().priority(0).build());
        nodes[5].addInput((ExecNode<?>)nodes[8], InputProperty.builder().priority(0).build());
        nodes[5].addInput((ExecNode<?>)nodes[0], InputProperty.builder().damBehavior(InputProperty.DamBehavior.END_INPUT).priority(0).build());
        nodes[7].addInput((ExecNode<?>)nodes[1], InputProperty.builder().priority(0).build());
        nodes[7].addInput((ExecNode<?>)nodes[2], InputProperty.builder().priority(0).build());
        nodes[7].addInput((ExecNode<?>)nodes[3], InputProperty.builder().priority(1).build());
        nodes[7].addInput((ExecNode<?>)nodes[4], InputProperty.builder().priority(10).build());
        nodes[7].addInput((ExecNode<?>)nodes[5], InputProperty.builder().priority(10).build());
        nodes[7].addInput((ExecNode<?>)nodes[6], InputProperty.builder().priority(100).build());
        Configuration configuration = new Configuration();
        BatchShuffleMode batchShuffleMode = (BatchShuffleMode)this.batchShuffleModeAndStreamExchangeMode.f0;
        StreamExchangeMode streamExchangeMode = (StreamExchangeMode)this.batchShuffleModeAndStreamExchangeMode.f1;
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)batchShuffleMode);
        InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(Collections.singletonList(nodes[7]), InputProperty.DamBehavior.END_INPUT, streamExchangeMode, (ReadableConfig)configuration);
        resolver.detectAndResolve();
        Assertions.assertThat(nodes[7].getInputNodes().get(0)).isEqualTo((Object)nodes[1]);
        Assertions.assertThat(nodes[7].getInputNodes().get(1)).isEqualTo((Object)nodes[2]);
        Assertions.assertThat(nodes[7].getInputNodes().get(2)).isInstanceOf(BatchExecExchange.class);
        Assertions.assertThat((Optional)((BatchExecExchange)nodes[7].getInputNodes().get(2)).getRequiredExchangeMode()).isEqualTo(Optional.of(streamExchangeMode));
        Assertions.assertThat((Object)((ExecEdge)nodes[7].getInputNodes().get(2).getInputEdges().get(0)).getSource()).isEqualTo((Object)nodes[3]);
        Assertions.assertThat(nodes[7].getInputNodes().get(3)).isInstanceOf(BatchExecExchange.class);
        Assertions.assertThat((Optional)((BatchExecExchange)nodes[7].getInputNodes().get(3)).getRequiredExchangeMode()).isEqualTo(Optional.of(streamExchangeMode));
        Assertions.assertThat((Object)((ExecEdge)nodes[7].getInputNodes().get(3).getInputEdges().get(0)).getSource()).isEqualTo((Object)nodes[4]);
        Assertions.assertThat(nodes[7].getInputNodes().get(4)).isEqualTo((Object)nodes[5]);
        Assertions.assertThat(nodes[7].getInputNodes().get(5)).isEqualTo((Object)nodes[6]);
    }

    @TestTemplate
    void testDeadlockCausedByExchange() {
        TestingBatchExecNode[] nodes = new TestingBatchExecNode[2];
        for (int i = 0; i < nodes.length; ++i) {
            nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        Configuration configuration = new Configuration();
        BatchShuffleMode batchShuffleMode = (BatchShuffleMode)this.batchShuffleModeAndStreamExchangeMode.f0;
        StreamExchangeMode streamExchangeMode = (StreamExchangeMode)this.batchShuffleModeAndStreamExchangeMode.f1;
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)batchShuffleMode);
        BatchExecExchange exchange = new BatchExecExchange((ReadableConfig)configuration, InputProperty.builder().requiredDistribution(InputProperty.ANY_DISTRIBUTION).build(), (RowType)nodes[0].getOutputType(), "Exchange");
        exchange.setRequiredExchangeMode(streamExchangeMode);
        ExecEdge execEdge = ExecEdge.builder().source((ExecNode)nodes[0]).target((ExecNode)exchange).build();
        exchange.setInputEdges(Collections.singletonList(execEdge));
        nodes[1].addInput((ExecNode<?>)exchange, InputProperty.builder().priority(0).build());
        nodes[1].addInput((ExecNode<?>)exchange, InputProperty.builder().priority(1).build());
        InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(Collections.singletonList(nodes[1]), InputProperty.DamBehavior.END_INPUT, streamExchangeMode, (ReadableConfig)configuration);
        resolver.detectAndResolve();
        ExecNode<?> input0 = nodes[1].getInputNodes().get(0);
        ExecNode<?> input1 = nodes[1].getInputNodes().get(1);
        Assertions.assertThat(input1).isNotSameAs(input0);
        Consumer<ExecNode> checkExchange = execNode -> {
            Assertions.assertThat((Object)execNode).isInstanceOf(BatchExecExchange.class);
            BatchExecExchange e = (BatchExecExchange)execNode;
            Assertions.assertThat((Optional)e.getRequiredExchangeMode()).isEqualTo(Optional.of(streamExchangeMode));
            Assertions.assertThat((Object)((ExecEdge)e.getInputEdges().get(0)).getSource()).isEqualTo((Object)nodes[0]);
        };
        checkExchange.accept(input0);
        checkExchange.accept(input1);
    }

    @TestTemplate
    void testWithDynamicFilteringPlan() {
        TestingBatchExecNode[] nodes = new TestingBatchExecNode[3];
        for (int i = 0; i < nodes.length; ++i) {
            nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
        }
        BatchExecTableSourceScan scan = new BatchExecTableSourceScan((ReadableConfig)new Configuration(), new DynamicTableSourceSpec(null, null), InputProperty.DEFAULT, RowType.of((LogicalType[])new LogicalType[]{new IntType(), new IntType(), new IntType()}), "DynamicFilteringTableSourceScan");
        BatchExecDynamicFilteringDataCollector collector = new BatchExecDynamicFilteringDataCollector(Collections.singletonList(1), (ReadableConfig)new Configuration(), InputProperty.DEFAULT, RowType.of((LogicalType[])new LogicalType[]{new IntType()}), "DynamicFilteringDataCollector");
        nodes[0].addInput((ExecNode<?>)nodes[1], InputProperty.builder().priority(0).build());
        nodes[1].addInput((ExecNode<?>)nodes[2], InputProperty.builder().priority(1).build());
        nodes[1].addInput((ExecNode<?>)scan, InputProperty.builder().priority(0).build());
        ExecEdge collect2Scan = ExecEdge.builder().source((ExecNode)collector).target((ExecNode)scan).build();
        scan.setInputEdges(Collections.singletonList(collect2Scan));
        ExecEdge toCollector = ExecEdge.builder().source((ExecNode)nodes[2]).target((ExecNode)collector).build();
        collector.setInputEdges(Collections.singletonList(toCollector));
        Configuration configuration = new Configuration();
        BatchShuffleMode batchShuffleMode = (BatchShuffleMode)this.batchShuffleModeAndStreamExchangeMode.f0;
        StreamExchangeMode streamExchangeMode = (StreamExchangeMode)this.batchShuffleModeAndStreamExchangeMode.f1;
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)batchShuffleMode);
        InputPriorityConflictResolver resolver = new InputPriorityConflictResolver(Collections.singletonList(nodes[1]), InputProperty.DamBehavior.END_INPUT, streamExchangeMode, (ReadableConfig)configuration);
        resolver.detectAndResolve();
        ExecNode<?> input0 = nodes[1].getInputNodes().get(0);
        ExecNode<?> input1 = nodes[1].getInputNodes().get(1);
        Assertions.assertThat(input0).isSameAs((Object)nodes[2]);
        Assertions.assertThat(input1).isSameAs((Object)scan);
    }
}

