/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.runtime.functions.scalar.SourceWatermarkFunction;
import org.apache.flink.table.utils.LegacyRowExtension;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015c\u0001\u0002\u000e\u001c\u00011BQa\r\u0001\u0005\u0002QBqa\u000e\u0001C\u0002\u0013%\u0001\b\u0003\u0004G\u0001\u0001\u0006I!\u000f\u0005\u0006)\u0002!\t%\u0016\u0005\u0006C\u0002!\t!\u0016\u0005\u0006M\u0002!\t!\u0016\u0005\u0006Q\u0002!\t!\u0016\u0005\u0006U\u0002!\t!\u0016\u0005\u0006Y\u0002!\t!\u0016\u0005\u0006]\u0002!\t!\u0016\u0005\u0006a\u0002!\t!\u0016\u0005\u0006e\u0002!\t!\u0016\u0005\u0006i\u0002!\t!\u0016\u0005\u0006m\u0002!\t!\u0016\u0005\u0006q\u0002!\t!\u0016\u0005\u0006u\u0002!\t!\u0016\u0005\u0006y\u0002!\t!\u0016\u0005\u0006}\u0002!\t!\u0016\u0005\u0007\u0003\u0003\u0001A\u0011A+\t\r\u0005\u0015\u0001\u0001\"\u0001V\u0011\u0019\tI\u0001\u0001C\u0001+\"9\u0011Q\u0002\u0001\u0005\n\u0005=\u0001BBA\u001d\u0001\u0011\u0005Q\u000b\u0003\u0004\u0002>\u0001!\t!\u0016\u0005\u0007\u0003\u0003\u0002A\u0011A+\u0003#Q\u000b'\r\\3T_V\u00148-Z%U\u0007\u0006\u001cXM\u0003\u0002\u001d;\u0005\u00191/\u001d7\u000b\u0005yy\u0012AB:ue\u0016\fWN\u0003\u0002!C\u00059!/\u001e8uS6,'B\u0001\u0012$\u0003\u001d\u0001H.\u00198oKJT!\u0001J\u0013\u0002\u000bQ\f'\r\\3\u000b\u0005\u0019:\u0013!\u00024mS:\\'B\u0001\u0015*\u0003\u0019\t\u0007/Y2iK*\t!&A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001[A\u0011a&M\u0007\u0002_)\u0011\u0001gH\u0001\u0006kRLGn]\u0005\u0003e=\u0012\u0011c\u0015;sK\u0006l\u0017N\\4UKN$()Y:f\u0003\u0019a\u0014N\\5u}Q\tQ\u0007\u0005\u00027\u00015\t1$A\u0001`+\u0005I\u0004c\u0001\u001e@\u00036\t1H\u0003\u0002={\u0005IA/Z:ukRLGn\u001d\u0006\u0003}\u0015\nAaY8sK&\u0011\u0001i\u000f\u0002\u0014\u000b\u0006\u001c\u0007nQ1mY\n\f7m[,sCB\u0004XM\u001d\t\u0003\u0005\u0012k\u0011a\u0011\u0006\u0003a\rJ!!R\"\u0003%1+w-Y2z%><X\t\u001f;f]NLwN\\\u0001\u0003?\u0002B#a\u0001%\u0011\u0005%\u0013V\"\u0001&\u000b\u0005-c\u0015!C3yi\u0016t7/[8o\u0015\tie*A\u0002ba&T!a\u0014)\u0002\u000f),\b/\u001b;fe*\u0011\u0011+K\u0001\u0006UVt\u0017\u000e^\u0005\u0003'*\u0013\u0011CU3hSN$XM]#yi\u0016t7/[8o\u0003\u0019\u0011WMZ8sKR\ta\u000b\u0005\u0002X56\t\u0001LC\u0001Z\u0003\u0015\u00198-\u00197b\u0013\tY\u0006L\u0001\u0003V]&$\bF\u0001\u0003^!\tqv,D\u0001M\u0013\t\u0001GJ\u0001\u0006CK\u001a|'/Z#bG\"\f\u0011\u0003^3tiNKW\u000e\u001d7f!J|'.Z2uQ\t)1\r\u0005\u0002_I&\u0011Q\r\u0014\u0002\u0005)\u0016\u001cH/\u0001\u000euKN$\bK]8kK\u000e$x+\u001b;i_V$\u0018J\u001c9viJ+g\r\u000b\u0002\u0007G\u0006\tB/Z:u\u001d\u0016\u001cH/\u001a3Qe>TWm\u0019;)\u0005\u001d\u0019\u0017!\u0007;fgRtUm\u001d;fIB\u0013xN[3di^KG\u000f[%uK6D#\u0001C2\u0002;Q,7\u000f\u001e+bE2,7k\\;sG\u0016<\u0016\u000e\u001e5GS2$XM]1cY\u0016D#!C2\u0002KQ,7\u000f\u001e+bE2,7k\\;sG\u0016<\u0016\u000e\u001e5Gk:\u001cG/[8o\r&dG/\u001a:bE2,\u0007F\u0001\u0006d\u0003U!Xm\u001d;J]B,HOR8s[\u0006$8k\\;sG\u0016D#aC2\u0002!Q,7\u000f^!mY\u0012\u000bG/\u0019+za\u0016\u001c\bF\u0001\u0007d\u0003a!Xm\u001d;TS6\u0004H.Z'fi\u0006$\u0017\r^1BG\u000e,7o\u001d\u0015\u0003\u001b\r\f\u0011\u0004^3ti\u000e{W\u000e\u001d7fq6+G/\u00193bi\u0006\f5mY3tg\"\u0012abY\u0001!i\u0016\u001cH\u000fR;qY&\u001c\u0017\r^3NKR\fG-\u0019;b\rJ|WnU1nK.+\u0017\u0010\u000b\u0002\u0010G\u00061C/Z:u\u001d\u0016\u001cH/\u001a3Qe>TWm\u0019;j_:<\u0016\u000e\u001e5NKR\fG-\u0019;b\u0003\u000e\u001cWm]:)\u0005A\u0019\u0017\u0001\u0007;fgR\u001cv.\u001e:dK^\u000bG/\u001a:nCJ\\\u0017J\u001c#E\u0019\"\u0012\u0011cY\u0001\u001bi\u0016\u001cHoU8ve\u000e,w+\u0019;fe6\f'o[%o#V,'/\u001f\u0015\u0003%\r\fa\u0003^3tiNKW\u000e\u001d7f\u001d\u0016\u001cH/\u001a3GS2$XM\u001d\u0015\u0003'\r\fq\u0003^3ti:+7\u000f^3e\r&dG/\u001a:P]\u0006\u0013(/Y=)\u0005Q\u0019\u0017!\u0006;fgRtUm\u001d;fI\u001aKG\u000e^3s\u001f:l\u0015\r\u001d\u0015\u0003+\r\fq#\u001b8oKJ$Vm\u001d;TKR\u0004\u0016M]1mY\u0016d\u0017n]7\u0015\u000fY\u000b\t\"a\u000b\u00026!9\u00111\u0003\fA\u0002\u0005U\u0011\u0001\u00039s_ZLG-\u001a:\u0011\t\u0005]\u0011Q\u0005\b\u0005\u00033\t\t\u0003E\u0002\u0002\u001cak!!!\b\u000b\u0007\u0005}1&\u0001\u0004=e>|GOP\u0005\u0004\u0003GA\u0016A\u0002)sK\u0012,g-\u0003\u0003\u0002(\u0005%\"AB*ue&twMC\u0002\u0002$aCq!!\f\u0017\u0001\u0004\ty#A\u0006qCJ\fG\u000e\\3mSNl\u0007cA,\u00022%\u0019\u00111\u0007-\u0003\u0007%sG\u000fC\u0004\u00028Y\u0001\r!a\f\u0002\u000b%tG-\u001a=\u0002CQ,7\u000f\u001e)be\u0006dG.\u001a7jg6<\u0016\u000e\u001e5T_V\u00148-\u001a$v]\u000e$\u0018n\u001c8)\u0005]\u0019\u0017A\b;fgR\u0004\u0016M]1mY\u0016d\u0017n]7XSRD\u0017J\u001c9vi\u001a{'/\\1uQ\tA2-A\u000fuKN$\b+\u0019:bY2,G.[:n/&$\b\u000eR1uCN#(/Z1nQ\tI2\r")
public class TableSourceITCase
extends StreamingTestBase {
    @RegisterExtension
    private final EachCallbackWrapper<LegacyRowExtension> _ = new EachCallbackWrapper((CustomExtension)new LegacyRowExtension());

    private EachCallbackWrapper<LegacyRowExtension> _() {
        return this._;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        String myTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData3());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(379).append("\n                       |CREATE TABLE MyTable (\n                       |  `a` INT,\n                       |  `b` BIGINT,\n                       |  `c` STRING\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(myTableDataId).append("',\n                       |  'bounded' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        String filterableTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.orderedLoopRows());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(488).append("\n                       |CREATE TABLE FilterableTable (\n                       |  name STRING,\n                       |  id BIGINT,\n                       |  amount INT,\n                       |  price DOUBLE\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'filterable-fields' = 'amount',\n                       |  'data-id' = '").append(filterableTableDataId).append("',\n                       |  'bounded' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        String metadataTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData5());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(564).append("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3',\n         |  `other_metadata2` AS CAST(`other_metadata` AS BIGINT),\n         |  `b` BIGINT,\n         |  `metadata_1` INT METADATA,\n         |  `computed` AS `metadata_1` * 2,\n         |  `metadata_2` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '").append(metadataTableDataId).append("',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n         |)\n         |").toString())).stripMargin());
        String nestedTableDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.deepNestedRow());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(684).append("\n         |CREATE TABLE NestedTable (\n         |  id BIGINT,\n         |  deepNested ROW<\n         |     nested1 ROW<name STRING, `value.` INT>,\n         |     `nested2.` ROW<num INT, flag BOOLEAN>>,\n         |  nested ROW<name STRING, `value` INT>,\n         |  name STRING,\n         |  nestedItem ROW<deepArray ROW<`value` INT> ARRAY, deepMap MAP<STRING, INT>>,\n         |  lower_name AS LOWER(name)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'nested-projection-supported' = 'true',\n         |  'filterable-fields' = '`nested.value`;`nestedItem.deepMap`;`nestedItem.deepArray`',\n         |  'data-id' = '").append(nestedTableDataId).append("',\n         |  'bounded' = 'true'\n         |)\n         |").toString())).stripMargin());
    }

    @Test
    public void testSimpleProject() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT a, c FROM MyTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,Hi", (List)new .colon.colon((Object)"2,Hello", (List)new .colon.colon((Object)"3,Hello world", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testProjectWithoutInputRef() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT COUNT(*) FROM MyTable")).toRetractStream(TypeExtractor.createTypeInfo(Row.class));
        TestingRetractSink sink = new TestingRetractSink();
        result.addSink((SinkFunction)sink).setParallelism(result.getParallelism());
        this.env().execute();
        Assertions.assertThat((Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo((Object)new .colon.colon((Object)"3", (List)Nil$.MODULE$));
    }

    @Test
    public void testNestedProject() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |    deepNested.nested1.name AS nestedName,\n        |    nested.`value` AS nestedValue,\n        |    deepNested.`nested2.`.flag AS nestedFlag,\n        |    deepNested.`nested2.`.num + deepNested.nested1.`value.` AS nestedNum,\n        |    lower_name\n        |FROM NestedTable\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,Sarah,10000,true,1100,mary", (List)new .colon.colon((Object)"2,Rob,20000,false,2200,bob", (List)new .colon.colon((Object)"3,Mike,30000,true,3300,liz", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedProjectWithItem() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT nestedItem.deepArray[nestedItem.deepMap['Monday']] FROM  NestedTable\n        |")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1", (List)new .colon.colon((Object)"1", (List)new .colon.colon((Object)"1", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testTableSourceWithFilterable() {
        String query = "SELECT id, amount, name FROM FilterableTable WHERE amount > 4 AND price < 9";
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"5,5,Record_5", (List)new .colon.colon((Object)"6,6,Record_6", (List)new .colon.colon((Object)"7,7,Record_7", (List)new .colon.colon((Object)"8,8,Record_8", (List)Nil$.MODULE$))));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testTableSourceWithFunctionFilterable() {
        String query = "SELECT id, amount, name FROM FilterableTable WHERE amount > 4 AND price < 9 AND upper(name) = 'RECORD_5'";
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"5,5,Record_5", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testInputFormatSource() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.smallData3());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(263).append("\n         |CREATE TABLE MyInputFormatTable (\n         |  `a` INT,\n         |  `b` BIGINT,\n         |  `c` STRING\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '").append(dataId).append("',\n         |  'runtime-source' = 'InputFormat'\n         |)\n         |").toString())).stripMargin());
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT a, c FROM MyInputFormatTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,Hi", (List)new .colon.colon((Object)"2,Hello", (List)new .colon.colon((Object)"3,Hello world", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testAllDataTypes() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.fullDataTypesData());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(629).append("\n         |CREATE TABLE T (\n         |  `a` BOOLEAN,\n         |  `b` TINYINT,\n         |  `c` SMALLINT,\n         |  `d` INT,\n         |  `e` BIGINT,\n         |  `f` FLOAT,\n         |  `g` DOUBLE,\n         |  `h` DECIMAL(5, 2),\n         |  `x` DECIMAL(30, 10),\n         |  `i` VARCHAR(5),\n         |  `j` CHAR(5),\n         |  `k` DATE,\n         |  `l` TIME(0),\n         |  `m` TIMESTAMP(9),\n         |  `n` TIMESTAMP(9) WITH LOCAL TIME ZONE,\n         |  `o` ARRAY<BIGINT>,\n         |  `p` ROW<f1 BIGINT, f2 STRING, f3 DOUBLE>\n         |) WITH (\n         |  'connector' = 'values',\n         |  'data-id' = '").append(dataId).append("'\n         |)\n         |").toString())).stripMargin());
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT * FROM T")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"true,127,32767,2147483647,9223372036854775807,-1.123,-1.123,5.10,1234567891012345.1000000000,1,1,1969-01-01,00:00:00.123,1969-01-01T00:00:00.123456789,1969-01-01T00:00:00.123456789Z,[1, 2, 3],1,a,2.3", (List)new .colon.colon((Object)"false,-128,-32768,-2147483648,-9223372036854775808,3.4,3.4,6.10,61234567891012345.1000000000,12,12,1970-09-30,01:01:01.123,1970-09-30T01:01:01.123456,1970-09-30T01:01:01.123456Z,[4, 5],null,b,4.56", (List)new .colon.colon((Object)"true,0,0,0,0,0.12,0.12,7.10,71234567891012345.1000000000,123,123,1990-12-24,08:10:24.123,1990-12-24T08:10:24.123,1990-12-24T08:10:24.123Z,[6, null, 7],3,null,7.86", (List)new .colon.colon((Object)"false,5,4,123,1234,1.2345,1.2345,8.12,812345678910123451.0123456789,1234,1234,2020-05-01,23:23:23,2020-05-01T23:23:23,2020-05-01T23:23:23Z,[8],4,c,null", (List)new .colon.colon((Object)"null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null", (List)Nil$.MODULE$)))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testSimpleMetadataAccess() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT `a`, `b`, `metadata_2` FROM MetadataTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,1,Hallo", (List)new .colon.colon((Object)"2,2,Hallo Welt", (List)new .colon.colon((Object)"2,3,Hallo Welt wie", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testComplexMetadataAccess() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT `a`, `other_metadata`, `b`, `metadata_2`, `computed` FROM MetadataTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,1,1,Hallo,0", (List)new .colon.colon((Object)"2,2,2,Hallo Welt,2", (List)new .colon.colon((Object)"2,1,3,Hallo Welt wie,4", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testDuplicateMetadataFromSameKey() {
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery("SELECT other_metadata, other_metadata2, metadata_2 FROM MetadataTable")).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,1,Hallo", (List)new .colon.colon((Object)"1,1,Hallo Welt wie", (List)new .colon.colon((Object)"2,2,Hallo Welt", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedProjectionWithMetadataAccess() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |    deepNested.nested1.name AS nestedName,\n        |    nested.`value` AS nestedValue,\n        |    deepNested.`nested2.`.flag AS nestedFlag,\n        |    deepNested.`nested2.`.num + deepNested.nested1.`value.` AS nestedNum,\n        |    LOWER(name) as lowerName\n        |FROM NestedTable\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,Sarah,10000,true,1100,mary", (List)new .colon.colon((Object)"2,Rob,20000,false,2200,bob", (List)new .colon.colon((Object)"3,Mike,30000,true,3300,liz", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testSourceWatermarkInDDL() {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.data3WithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(501).append("\n                       |CREATE TABLE tableWithWatermark (\n                       |  `a` INT,\n                       |  `b` BIGINT,\n                       |  `c` STRING,\n                       |  `ts` TIMESTAMP(3),\n                       |  WATERMARK FOR ts AS SOURCE_WATERMARK()\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(dataId).append("',\n                       |  'bounded' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("SELECT * FROM tableWithWatermark").await()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, (String)SourceWatermarkFunction.ERROR_MESSAGE)});
    }

    @Test
    public void testSourceWatermarkInQuery() {
        Assertions.assertThatThrownBy(() -> this.tEnv().executeSql("SELECT *, SOURCE_WATERMARK() FROM MyTable").print()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(TableException.class, (String)SourceWatermarkFunction.ERROR_MESSAGE)});
    }

    @Test
    public void testSimpleNestedFilter() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id, deepNested.nested1.name AS nestedName FROM NestedTable\n        |   WHERE nested.`value` > 20000\n    ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"3,Mike", (List)Nil$.MODULE$);
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedFilterOnArray() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |   deepNested.nested1.name AS nestedName,\n        |   nestedItem.deepArray[2].`value` FROM NestedTable\n        |WHERE nestedItem.deepArray[2].`value` > 1\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,Sarah,2", (List)new .colon.colon((Object)"2,Rob,2", (List)new .colon.colon((Object)"3,Mike,2", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testNestedFilterOnMap() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT id,\n        |   deepNested.nested1.name AS nestedName,\n        |   nestedItem.deepMap['Monday'] FROM NestedTable\n        |WHERE nestedItem.deepMap['Monday'] = 1\n      ")).stripMargin();
        DataStream result = package$.MODULE$.tableConversions(this.tEnv().sqlQuery(query)).toDataStream();
        TestingAppendSink sink = new TestingAppendSink();
        result.addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"1,Sarah,1", (List)new .colon.colon((Object)"2,Rob,1", (List)new .colon.colon((Object)"3,Mike,1", (List)Nil$.MODULE$)));
        Assertions.assertThat((Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).isEqualTo(expected.sorted((Ordering)Ordering.String$.MODULE$));
    }

    private void innerTestSetParallelism(String provider, int parallelism, int index) {
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.data1());
        String sourceTableName = new StringBuilder(18).append("test_para_source_").append(provider.toLowerCase().trim()).append("_").append(index).toString();
        String sinkTableName = new StringBuilder(16).append("test_para_sink_").append(provider.toLowerCase().trim()).append("_").append(index).toString();
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(546).append("\n                       |CREATE TABLE ").append(sourceTableName).append(" (\n                       |  the_month INT,\n                       |  area STRING,\n                       |  product INT\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'data-id' = '").append(dataId).append("',\n                       |  'bounded' = 'true',\n                       |  'runtime-source' = '").append(provider).append("',\n                       |  'scan.parallelism' = '").append(parallelism).append("',\n                       |  'enable-projection-push-down' = 'false'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(346).append("\n                       |CREATE TABLE ").append(sinkTableName).append(" (\n                       |  the_month INT,\n                       |  area STRING,\n                       |  product INT\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'true'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().executeSql(new StringBuilder(27).append("INSERT INTO ").append(sinkTableName).append(" SELECT * FROM ").append(sourceTableName).toString()).await();
    }

    @Test
    public void testParallelismWithSourceFunction() {
        int negativeParallelism = -1;
        int validParallelism = 3;
        AtomicInteger index = new AtomicInteger(1);
        Assertions.assertThatThrownBy(() -> this.innerTestSetParallelism("SourceFunction", negativeParallelism, index.getAndIncrement())).hasMessageContaining("Invalid configured parallelism");
        this.innerTestSetParallelism("SourceFunction", validParallelism, index.getAndIncrement());
    }

    @Test
    public void testParallelismWithInputFormat() {
        int negativeParallelism = -1;
        int validParallelism = 3;
        AtomicInteger index = new AtomicInteger(2);
        Assertions.assertThatThrownBy(() -> this.innerTestSetParallelism("InputFormat", negativeParallelism, index.getAndIncrement())).hasMessageContaining("Invalid configured parallelism");
        this.innerTestSetParallelism("InputFormat", validParallelism, index.getAndIncrement());
    }

    @Test
    public void testParallelismWithDataStream() {
        int negativeParallelism = -1;
        int validParallelism = 3;
        AtomicInteger index = new AtomicInteger(3);
        Assertions.assertThatThrownBy(() -> this.innerTestSetParallelism("DataStream", negativeParallelism, index.getAndIncrement())).hasMessageContaining("Invalid configured parallelism");
        this.innerTestSetParallelism("DataStream", validParallelism, index.getAndIncrement());
    }
}

