/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.lineage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
import org.apache.flink.streaming.api.lineage.DefaultLineageVertex;
import org.apache.flink.streaming.api.lineage.DefaultSourceLineageVertex;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageGraph;
import org.apache.flink.streaming.api.lineage.LineageGraphUtils;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class LineageGraphUtilsTest {
    private static final String SOURCE_DATASET_NAME = "LineageSource";
    private static final String SOURCE_DATASET_NAMESPACE = "source://LineageSource";
    private static final String SINK_DATASET_NAME = "LineageSink";
    private static final String SINK_DATASET_NAMESPACE = "sink://LineageSink";
    private static final String LEGACY_SOURCE_DATASET_NAME = "LineageSourceFunction";
    private static final String LEGACY_SOURCE_DATASET_NAMESPACE = "source://LineageSourceFunction";
    private static final String LEGACY_SINK_DATASET_NAME = "LineageSinkFunction";
    private static final String LEGACY_SINK_DATASET_NAMESPACE = "sink://LineageSinkFunction";

    LineageGraphUtilsTest() {
    }

    @Test
    void testExtractLineageGraphFromLegacyTransformations() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new LineageSourceFunction());
        DataStreamSink sink = source.addSink((SinkFunction)new LineageSinkFunction());
        LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));
        Assertions.assertThat((int)lineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat((Comparable)((SourceLineageVertex)lineageGraph.sources().get(0)).boundedness()).isEqualTo((Object)Boundedness.CONTINUOUS_UNBOUNDED);
        Assertions.assertThat((int)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).name()).isEqualTo(LEGACY_SOURCE_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).namespace()).isEqualTo(LEGACY_SOURCE_DATASET_NAMESPACE);
        Assertions.assertThat((int)lineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat((int)((LineageVertex)lineageGraph.sinks().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).name()).isEqualTo(LEGACY_SINK_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).namespace()).isEqualTo(LEGACY_SINK_DATASET_NAMESPACE);
        Assertions.assertThat((int)lineageGraph.relations().size()).isEqualTo(1);
    }

    @Test
    void testExtractLineageGraphFromTransformations() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromSource((Source)new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), "");
        DataStreamSink sink = source.sinkTo((Sink)new LineageSink());
        LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph(Arrays.asList(sink.getTransformation()));
        Assertions.assertThat((int)lineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat((Comparable)((SourceLineageVertex)lineageGraph.sources().get(0)).boundedness()).isEqualTo((Object)Boundedness.BOUNDED);
        Assertions.assertThat((int)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).name()).isEqualTo(SOURCE_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).namespace()).isEqualTo(SOURCE_DATASET_NAMESPACE);
        Assertions.assertThat((int)lineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat((int)((LineageVertex)lineageGraph.sinks().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).name()).isEqualTo(SINK_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).namespace()).isEqualTo(SINK_DATASET_NAMESPACE);
        Assertions.assertThat((int)lineageGraph.relations().size()).isEqualTo(1);
    }

    @Test
    void testExtractPartialLineageGraphWithSourceOnly() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromSource((Source)new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), "");
        source.sinkTo((Sink)new DiscardingSink());
        LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph((List)env.getTransformations());
        Assertions.assertThat((int)lineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat((Comparable)((SourceLineageVertex)lineageGraph.sources().get(0)).boundedness()).isEqualTo((Object)Boundedness.BOUNDED);
        Assertions.assertThat((int)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).name()).isEqualTo(SOURCE_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((SourceLineageVertex)lineageGraph.sources().get(0)).datasets().get(0)).namespace()).isEqualTo(SOURCE_DATASET_NAMESPACE);
        Assertions.assertThat((List)lineageGraph.sinks()).isEmpty();
        Assertions.assertThat((List)lineageGraph.relations()).isEmpty();
    }

    @Test
    void testExtractPartialLineageGraphWithSinkOnly() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromSource((Source)new NumberSequenceSource(1L, 5L), WatermarkStrategy.noWatermarks(), "");
        source.sinkTo((Sink)new LineageSink());
        LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph((List)env.getTransformations());
        Assertions.assertThat((int)lineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat((int)((LineageVertex)lineageGraph.sinks().get(0)).datasets().size()).isEqualTo(1);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).name()).isEqualTo(SINK_DATASET_NAME);
        Assertions.assertThat((String)((LineageDataset)((LineageVertex)lineageGraph.sinks().get(0)).datasets().get(0)).namespace()).isEqualTo(SINK_DATASET_NAMESPACE);
        Assertions.assertThat((List)lineageGraph.sources()).isEmpty();
        Assertions.assertThat((List)lineageGraph.relations()).isEmpty();
    }

    @Test
    void testSourceDeduplication() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromSource((Source)new LineageSource(1L, 5L), WatermarkStrategy.noWatermarks(), "");
        source.sinkTo((Sink)new DiscardingSink());
        ArrayList list = new ArrayList();
        list.addAll(env.getTransformations());
        list.addAll(env.getTransformations());
        LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph(list);
        Assertions.assertThat((int)lineageGraph.sources().size()).isEqualTo(1);
        Assertions.assertThat((int)lineageGraph.sinks().size()).isEqualTo(0);
        Assertions.assertThat((List)lineageGraph.relations()).isEmpty();
    }

    @Test
    void testSinkDuduplication() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromSource((Source)new NumberSequenceSource(1L, 5L), WatermarkStrategy.noWatermarks(), "");
        source.sinkTo((Sink)new LineageSink());
        ArrayList list = new ArrayList();
        list.addAll(env.getTransformations());
        list.addAll(env.getTransformations());
        LineageGraph lineageGraph = LineageGraphUtils.convertToLineageGraph(list);
        Assertions.assertThat((int)lineageGraph.sources().size()).isEqualTo(0);
        Assertions.assertThat((int)lineageGraph.sinks().size()).isEqualTo(1);
        Assertions.assertThat((List)lineageGraph.relations()).isEmpty();
    }

    private static class LineageSourceFunction
    implements SourceFunction<Long>,
    LineageVertexProvider {
        private volatile boolean running = true;

        private LineageSourceFunction() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            long next = 0L;
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)next++);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset lineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.LEGACY_SOURCE_DATASET_NAME, LineageGraphUtilsTest.LEGACY_SOURCE_DATASET_NAMESPACE, new HashMap());
            DefaultSourceLineageVertex lineageVertex = new DefaultSourceLineageVertex(Boundedness.CONTINUOUS_UNBOUNDED);
            lineageVertex.addDataset((LineageDataset)lineageDataset);
            return lineageVertex;
        }
    }

    private static class LineageSinkFunction
    implements SinkFunction<Long>,
    LineageVertexProvider {
        private LineageSinkFunction() {
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset lineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.LEGACY_SINK_DATASET_NAME, LineageGraphUtilsTest.LEGACY_SINK_DATASET_NAMESPACE, new HashMap());
            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
            lineageVertex.addLineageDataset((LineageDataset)lineageDataset);
            return lineageVertex;
        }
    }

    private static class LineageSource
    extends NumberSequenceSource
    implements LineageVertexProvider {
        public LineageSource(long from, long to) {
            super(from, to);
        }

        public LineageVertex getLineageVertex() {
            DefaultLineageDataset lineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.SOURCE_DATASET_NAME, LineageGraphUtilsTest.SOURCE_DATASET_NAMESPACE, new HashMap());
            DefaultSourceLineageVertex lineageVertex = new DefaultSourceLineageVertex(Boundedness.BOUNDED);
            lineageVertex.addDataset((LineageDataset)lineageDataset);
            return lineageVertex;
        }
    }

    private static class LineageSink
    extends DiscardingSink<Long>
    implements LineageVertexProvider {
        public LineageVertex getLineageVertex() {
            DefaultLineageDataset lineageDataset = new DefaultLineageDataset(LineageGraphUtilsTest.SINK_DATASET_NAME, LineageGraphUtilsTest.SINK_DATASET_NAMESPACE, new HashMap());
            DefaultLineageVertex lineageVertex = new DefaultLineageVertex();
            lineageVertex.addLineageDataset((LineageDataset)lineageDataset);
            return lineageVertex;
        }
    }
}

