/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamGraphGeneratorBatchExecutionTest {
    private static final KeyedProcessFunction<Integer, Integer, Integer> DUMMY_PROCESS_FUNCTION = new KeyedProcessFunction<Integer, Integer, Integer>(){

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<Integer> out) {
        }
    };
    private static final KeyedCoProcessFunction<Integer, Integer, Integer, Integer> DUMMY_KEYED_CO_PROCESS_FUNCTION = new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>(){

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) {
        }

        public void processElement2(Integer value, KeyedCoProcessFunction.Context ctx, Collector<Integer> out) {
        }
    };

    StreamGraphGeneratorBatchExecutionTest() {
    }

    @Test
    void testShuffleMode() {
        this.testGlobalStreamExchangeMode(RuntimeExecutionMode.AUTOMATIC, BatchShuffleMode.ALL_EXCHANGES_BLOCKING, GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        this.testGlobalStreamExchangeMode(RuntimeExecutionMode.STREAMING, BatchShuffleMode.ALL_EXCHANGES_BLOCKING, GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
        this.testGlobalStreamExchangeMode(RuntimeExecutionMode.BATCH, BatchShuffleMode.ALL_EXCHANGES_PIPELINED, GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
        this.testGlobalStreamExchangeMode(RuntimeExecutionMode.BATCH, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, GlobalStreamExchangeMode.ALL_EDGES_HYBRID_FULL);
        this.testGlobalStreamExchangeMode(RuntimeExecutionMode.BATCH, BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, GlobalStreamExchangeMode.ALL_EDGES_HYBRID_SELECTIVE);
    }

    @Test
    void testBatchJobType() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink<Integer> sink = this.addDummyPipeline(env);
        StreamGraph graph = this.getStreamGraphInBatchMode(sink);
        Assertions.assertThat((Comparable)graph.getJobType()).isEqualTo((Object)JobType.BATCH);
    }

    @Test
    void testManagedMemoryWeights() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromData((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = this.getStreamGraphInBatchMode(sink);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        HashMap<ManagedMemoryUseCase, Integer> expectedOperatorWeights = new HashMap<ManagedMemoryUseCase, Integer>();
        expectedOperatorWeights.put(ManagedMemoryUseCase.OPERATOR, ((MemorySize)ExecutionOptions.SORTED_INPUTS_MEMORY.defaultValue()).getMebiBytes());
        Assertions.assertThat((Map)processNode.getManagedMemoryOperatorScopeUseCaseWeights()).isEqualTo(expectedOperatorWeights);
        Assertions.assertThat((Collection)processNode.getManagedMemorySlotScopeUseCases()).containsOnly((Object[])new ManagedMemoryUseCase[]{ManagedMemoryUseCase.STATE_BACKEND});
    }

    @Test
    void testCustomManagedMemoryWeights() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromData((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORTED_INPUTS_MEMORY, (Object)MemorySize.ofMebiBytes((long)42L));
        StreamGraph graph = this.getStreamGraphInBatchMode(sink, configuration);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        HashMap<ManagedMemoryUseCase, Integer> expectedOperatorWeights = new HashMap<ManagedMemoryUseCase, Integer>();
        expectedOperatorWeights.put(ManagedMemoryUseCase.OPERATOR, 42);
        Assertions.assertThat((Map)processNode.getManagedMemoryOperatorScopeUseCaseWeights()).isEqualTo(expectedOperatorWeights);
        Assertions.assertThat((Collection)processNode.getManagedMemorySlotScopeUseCases()).containsOnly((Object[])new ManagedMemoryUseCase[]{ManagedMemoryUseCase.STATE_BACKEND});
    }

    @Test
    void testOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromData((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = this.getStreamGraphInBatchMode(sink);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)processNode.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.HEAD);
        Assertions.assertThat((Object)graph.getStateBackend()).isInstanceOf(BatchExecutionStateBackend.class);
        Assertions.assertThat((Object)graph.getTimerServiceProvider()).isNotNull();
    }

    @Test
    void testDisablingStateBackendOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromData((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        StreamGraph graph = this.getStreamGraphInBatchMode(sink, configuration);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)processNode.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.HEAD);
        Assertions.assertThat((Object)graph.getStateBackend()).isNull();
        Assertions.assertThat((Object)graph.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromData((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        StreamGraph graph = this.getStreamGraphInBatchMode(sink, configuration);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(0))).isNull();
        Assertions.assertThat((Object)graph.getStateBackend()).isNull();
        Assertions.assertThat((Object)graph.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsWithoutBatchStateBackendOneInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator process = env.fromData((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.getStreamGraphInBatchMode(sink, configuration)).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Batch state backend requires the sorted inputs to be enabled!");
    }

    @Test
    void testTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromData((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = this.getStreamGraphInBatchMode(sink);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(1))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)processNode.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.HEAD);
        Assertions.assertThat((Object)graph.getStateBackend()).isInstanceOf(BatchExecutionStateBackend.class);
        Assertions.assertThat((Object)graph.getTimerServiceProvider()).isNotNull();
    }

    @Test
    void testDisablingStateBackendTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromData((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        StreamGraph graph = this.getStreamGraphInBatchMode(sink, configuration);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(1))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)processNode.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.HEAD);
        Assertions.assertThat((Object)graph.getStateBackend()).isNull();
        Assertions.assertThat((Object)graph.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromData((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, (Object)false);
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        StreamGraph graph = this.getStreamGraphInBatchMode(sink, configuration);
        StreamNode processNode = graph.getStreamNode(Integer.valueOf(process.getId()));
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(0))).isNull();
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)processNode.getInputRequirements().get(1))).isNull();
        Assertions.assertThat((Object)graph.getStateBackend()).isNull();
        Assertions.assertThat((Object)graph.getTimerServiceProvider()).isNull();
    }

    @Test
    void testDisablingSortingInputsWithoutBatchStateBackendTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromData((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        DataStreamSink sink = process.sinkTo((Sink)new DiscardingSink());
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.SORT_INPUTS, (Object)false);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.getStreamGraphInBatchMode(sink, configuration)).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Batch state backend requires the sorted inputs to be enabled!");
    }

    @Test
    void testInputSelectableTwoInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromData((Object[])new Integer[]{1, 2});
        SingleOutputStreamOperator process = elements1.connect((DataStream)elements2).keyBy(Integer::intValue, Integer::intValue).process(DUMMY_KEYED_CO_PROCESS_FUNCTION);
        SingleOutputStreamOperator selectableOperator = process.connect((DataStream)elements1).keyBy(Integer::intValue, Integer::intValue).transform("operator", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TwoInputStreamOperator)new InputSelectableTwoInputOperator());
        DataStreamSink sink = selectableOperator.sinkTo((Sink)new DiscardingSink());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.getStreamGraphInBatchMode(sink)).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator.");
    }

    @Test
    void testMultiInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements3 = env.fromData((Object[])new Integer[]{1, 2});
        MultipleInputOperatorFactory selectableOperator = new MultipleInputOperatorFactory(3, false);
        KeyedMultipleInputTransformation multipleInputTransformation = new KeyedMultipleInputTransformation("operator", (StreamOperatorFactory)selectableOperator, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        multipleInputTransformation.addInput(elements1.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements2.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements3.getTransformation(), (KeySelector & Serializable)e -> e);
        DataStreamSink sink = new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)multipleInputTransformation).sinkTo((Sink)new DiscardingSink());
        StreamGraph graph = this.getStreamGraphInBatchMode(sink);
        StreamNode operatorNode = graph.getStreamNode(Integer.valueOf(multipleInputTransformation.getId()));
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)operatorNode.getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)operatorNode.getInputRequirements().get(1))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)operatorNode.getOperatorFactory().getChainingStrategy()).isEqualTo((Object)ChainingStrategy.HEAD);
        Assertions.assertThat((Object)graph.getStateBackend()).isInstanceOf(BatchExecutionStateBackend.class);
        Assertions.assertThat((Object)graph.getTimerServiceProvider()).isNotNull();
    }

    @Test
    void testInputSelectableMultiInputTransformation() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource elements1 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements2 = env.fromData((Object[])new Integer[]{1, 2});
        DataStreamSource elements3 = env.fromData((Object[])new Integer[]{1, 2});
        MultipleInputOperatorFactory selectableOperator = new MultipleInputOperatorFactory(3, true);
        KeyedMultipleInputTransformation multipleInputTransformation = new KeyedMultipleInputTransformation("operator", (StreamOperatorFactory)selectableOperator, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 1, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        multipleInputTransformation.addInput(elements1.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements2.getTransformation(), (KeySelector & Serializable)e -> e);
        multipleInputTransformation.addInput(elements3.getTransformation(), (KeySelector & Serializable)e -> e);
        DataStreamSink sink = new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)multipleInputTransformation).sinkTo((Sink)new DiscardingSink());
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.getStreamGraphInBatchMode(sink)).isInstanceOf(IllegalStateException.class)).hasMessageContaining("Batch state backend and sorting inputs are not supported in graphs with an InputSelectable operator.");
    }

    private void testNoSupportForIterationsInBatchHelper(Transformation<?> ... transformations) {
        ArrayList registeredTransformations = new ArrayList();
        Collections.addAll(registeredTransformations, transformations);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        StreamGraphGenerator streamGraphGenerator = new StreamGraphGenerator(registeredTransformations, new ExecutionConfig(), new CheckpointConfig(), configuration);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((StreamGraphGenerator)streamGraphGenerator).generate()).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("Iterations are not supported in BATCH execution mode.");
    }

    private void testGlobalStreamExchangeMode(RuntimeExecutionMode runtimeExecutionMode, BatchShuffleMode shuffleMode, GlobalStreamExchangeMode expectedStreamExchangeMode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSink<Integer> sink = this.addDummyPipeline(env);
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)runtimeExecutionMode);
        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)shuffleMode);
        StreamGraphGenerator streamGraphGenerator = new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), new ExecutionConfig(), new CheckpointConfig(), configuration);
        StreamGraph graph = streamGraphGenerator.generate();
        Assertions.assertThat((Comparable)graph.getGlobalStreamExchangeMode()).isEqualTo((Object)expectedStreamExchangeMode);
    }

    private DataStreamSink<Integer> addDummyPipeline(StreamExecutionEnvironment env) {
        return env.fromData((Object[])new Integer[]{1, 2}).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION).sinkTo((Sink)new DiscardingSink());
    }

    private StreamGraph getStreamGraphInBatchMode(DataStreamSink<?> sink) {
        return this.getStreamGraphInBatchMode(sink, new Configuration());
    }

    private StreamGraph getStreamGraphInBatchMode(DataStreamSink<?> sink, Configuration configuration) {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.configure((ReadableConfig)configuration, StreamGraphGenerator.class.getClassLoader());
        CheckpointConfig checkpointConfig = new CheckpointConfig();
        checkpointConfig.configure((ReadableConfig)configuration);
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        return new StreamGraphGenerator(Collections.singletonList(sink.getTransformation()), executionConfig, checkpointConfig, configuration).generate();
    }

    private static final class InputSelectableTwoInputOperator
    extends AbstractStreamOperator<Integer>
    implements TwoInputStreamOperator<Integer, Integer, Integer>,
    InputSelectable {
        private InputSelectableTwoInputOperator() {
        }

        public InputSelection nextSelection() {
            return null;
        }

        public void processElement1(StreamRecord<Integer> element) {
        }

        public void processElement2(StreamRecord<Integer> element) {
        }
    }

    private static final class MultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<Integer> {
        private final int inputsCount;
        private final boolean selectable;

        private MultipleInputOperatorFactory(int inputsCount, boolean selectable) {
            this.inputsCount = inputsCount;
            this.selectable = selectable;
        }

        public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> parameters) {
            if (this.selectable) {
                return (T)((Object)new InputSelectableMultipleInputOperator(parameters, this.inputsCount));
            }
            return (T)((Object)new MultipleInputOperator(parameters, this.inputsCount));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            if (this.selectable) {
                return InputSelectableMultipleInputOperator.class;
            }
            return MultipleInputOperator.class;
        }
    }

    private static final class InputSelectableMultipleInputOperator
    extends MultipleInputOperator
    implements InputSelectable {
        public InputSelectableMultipleInputOperator(StreamOperatorParameters<Integer> parameters, int inputsCount) {
            super(parameters, inputsCount);
        }

        public InputSelection nextSelection() {
            return null;
        }
    }

    private static class MultipleInputOperator
    extends AbstractStreamOperatorV2<Integer>
    implements MultipleInputStreamOperator<Integer> {
        public MultipleInputOperator(StreamOperatorParameters<Integer> parameters, int inputsCount) {
            super(parameters, inputsCount);
        }

        public List<Input> getInputs() {
            return Collections.emptyList();
        }
    }
}

