/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.api.typeutils.CaseClassTypeInfo;
import org.apache.flink.table.api.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.expressions.utils.Func0$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.stream.sql.TableScanTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005Ud\u0001B\u0016-\u0001uBQ\u0001\u0012\u0001\u0005\u0002\u0015Cq\u0001\u0013\u0001C\u0002\u0013%\u0011\n\u0003\u0004N\u0001\u0001\u0006IA\u0013\u0005\u0006\u001d\u0002!\ta\u0014\u0005\u0006C\u0002!\ta\u0014\u0005\u0006G\u0002!\ta\u0014\u0005\u0006K\u0002!\ta\u0014\u0005\u0006O\u0002!\ta\u0014\u0005\u0006S\u0002!\ta\u0014\u0005\u0006W\u0002!\ta\u0014\u0005\u0006[\u0002!\ta\u0014\u0005\u0006_\u0002!\ta\u0014\u0005\u0006c\u0002!\ta\u0014\u0005\u0006g\u0002!\ta\u0014\u0005\u0006k\u0002!\ta\u0014\u0005\u0006o\u0002!\ta\u0014\u0005\u0006s\u0002!\ta\u0014\u0005\u0006w\u0002!\ta\u0014\u0005\u0006{\u0002!\ta\u0014\u0005\u0006\u007f\u0002!\ta\u0014\u0005\u0007\u0003\u0007\u0001A\u0011A(\t\r\u0005\u001d\u0001\u0001\"\u0001P\u0011\u0019\tY\u0001\u0001C\u0001\u001f\"1\u0011q\u0002\u0001\u0005\u0002=Cq!a\u0005\u0001\t\u0013\t)\u0002\u0003\u0004\u00022\u0001!\ta\u0014\u0005\u0007\u0003k\u0001A\u0011A(\t\r\u0005e\u0002\u0001\"\u0001P\u0011\u0019\ti\u0004\u0001C\u0001\u001f\"1\u0011\u0011\t\u0001\u0005\u0002=Ca!!\u0012\u0001\t\u0003y\u0005BBA%\u0001\u0011\u0005q\n\u0003\u0004\u0002N\u0001!\ta\u0014\u0005\u0007\u0003#\u0002A\u0011A(\t\r\u0005U\u0003\u0001\"\u0001P\u0011\u0019\tI\u0006\u0001C\u0001\u001f\"1\u0011Q\f\u0001\u0005\u0002=Ca!!\u0019\u0001\t\u0003y\u0005BBA3\u0001\u0011\u0005q\n\u0003\u0004\u0002j\u0001!\ta\u0014\u0005\u0007\u0003[\u0002A\u0011A(\t\r\u0005E\u0004\u0001\"\u0001P\u00055!\u0016M\u00197f'\u000e\fg\u000eV3ti*\u0011QFL\u0001\u0004gFd'BA\u00181\u0003\u0019\u0019HO]3b[*\u0011\u0011GM\u0001\u0005a2\fgN\u0003\u00024i\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u001b7\u0003\u0015!\u0018M\u00197f\u0015\t9\u0004(A\u0003gY&t7N\u0003\u0002:u\u00051\u0011\r]1dQ\u0016T\u0011aO\u0001\u0004_J<7\u0001A\n\u0003\u0001y\u0002\"a\u0010\"\u000e\u0003\u0001S!!\u0011\u001a\u0002\u000bU$\u0018\u000e\\:\n\u0005\r\u0003%!\u0004+bE2,G+Z:u\u0005\u0006\u001cX-\u0001\u0004=S:LGO\u0010\u000b\u0002\rB\u0011q\tA\u0007\u0002Y\u0005!Q\u000f^5m+\u0005Q\u0005CA L\u0013\ta\u0005IA\nTiJ,\u0017-\u001c+bE2,G+Z:u+RLG.A\u0003vi&d\u0007%\u0001\nuKN$H)\u0019;b'R\u0014X-Y7TG\u0006tG#\u0001)\u0011\u0005E#V\"\u0001*\u000b\u0003M\u000bQa]2bY\u0006L!!\u0016*\u0003\tUs\u0017\u000e\u001e\u0015\u0003\t]\u0003\"\u0001W0\u000e\u0003eS!AW.\u0002\u0007\u0005\u0004\u0018N\u0003\u0002];\u00069!.\u001e9ji\u0016\u0014(B\u00010;\u0003\u0015QWO\\5u\u0013\t\u0001\u0017L\u0001\u0003UKN$\u0018\u0001\u0005;fgR$E\t\u0014+bE2,7kY1oQ\t)q+A\ruKN$H\t\u0012'XSRD7i\\7qkR,GmQ8mk6t\u0007F\u0001\u0004X\u0003\u0001\"Xm\u001d;E\t2;\u0016\u000e\u001e5S_^$\u0016\u0010]3D_6\u0004X\u000f^3e\u0007>dW/\u001c8)\u0005\u001d9\u0016!\u0007;fgR$E\tT,ji\"lU\r^1eCR\f7i\u001c7v[:D#\u0001C,\u0002eQ,7\u000f\u001e#E\u0019^KG\u000f['fi\u0006$\u0017\r^1UQ\u0006$8i\u001c8gY&\u001cGo],ji\"\u0004\u0006._:jG\u0006d7i\u001c7v[:D#!C,\u0002WQ,7\u000f\u001e#E\u0019^KG\u000f['fi\u0006$\u0017\r^1D_2,XN\u001c)s_*,7\r^5p]B+8\u000f\u001b#po:D#AC,\u0002EQ,7\u000f\u001e#E\u0019^KG\u000f[,bi\u0016\u0014X.\u0019:l\u0007>l\u0007/\u001e;fI\u000e{G.^7oQ\tYq+A\u0013uKN$H\t\u0012'XSRD7i\\7qkR,GmQ8mk6t'+\u001a4feJ{w\u000f^5nK\"\u0012AbV\u0001.i\u0016\u001cH\u000f\u0012#M/&$\b.T;mi&\u0004H.Z\"pYVlgn\u001d$s_6\u001c\u0016-\\3NKR\fG-\u0019;b\u0017\u0016L\bFA\u0007X\u00039\"Xm\u001d;E\t2;\u0016\u000e\u001e5Nk2$\u0018\u000e\u001d7f\u0007>dW/\u001c8t\rJ|WnU1nK6+G/\u00193bi\u0006\\U-\u001f\u001a)\u000599\u0016a\n;fgR\\U-_<pe\u0012\u001cx+\u001b;i/\u0006$XM]7be.\u001cu.\u001c9vi\u0016$7i\u001c7v[:D#aD,\u0002/Q,7\u000f^*dC:|eNQ8v]\u0012,GmU8ve\u000e,\u0007F\u0001\tX\u0003m!Xm\u001d;GS2$XM](o\u0007\"\fgnZ3m_\u001e\u001cv.\u001e:dK\"\u0012\u0011cV\u0001\u001ai\u0016\u001cHoU2b]>s7\t[1oO\u0016dwnZ*pkJ\u001cW\r\u000b\u0002\u0013/\u00061C/Z:u+:LwN\\\"iC:<W\r\\8h'>,(oY3B]\u0012\fum\u001a:fO\u0006$\u0018n\u001c8)\u0005M9\u0016A\b;fgR\fum\u001a:fO\u0006$Xm\u00148DQ\u0006tw-\u001a7pON{WO]2fQ\t!r+A\ruKN$(j\\5o\u001f:\u001c\u0005.\u00198hK2|wmU8ve\u000e,\u0007FA\u000bX\u00031\"Xm\u001d;K_&twJ\\\"iC:<W\r\\8h'>,(oY3XSRDWI^3oiN$U\u000f\u001d7jG\u0006$X\r\u000b\u0002\u0017/\u0006AB/Z:u\u0015>Lgn\u00148O_V\u0003H-\u0019;f'>,(oY3)\u0005]9\u0016A\u0006;fgRTu.\u001b8P]V\u00038/\u001a:u'>,(oY3)\u0005a9\u0016A\u0005<fe&4\u0017PS8j]>s7k\\;sG\u0016$2\u0001UA\f\u0011\u001d\tI\"\u0007a\u0001\u00037\tQb\u00195b]\u001e,Gn\\4N_\u0012,\u0007\u0003BA\u000f\u0003WqA!a\b\u0002(A\u0019\u0011\u0011\u0005*\u000e\u0005\u0005\r\"bAA\u0013y\u00051AH]8pizJ1!!\u000bS\u0003\u0019\u0001&/\u001a3fM&!\u0011QFA\u0018\u0005\u0019\u0019FO]5oO*\u0019\u0011\u0011\u0006*\u0002?Q,7\u000f^,bi\u0016\u0014X.\u0019:l\u0003:$7\t[1oO\u0016dwnZ*pkJ\u001cW\r\u000b\u0002\u001b/\u00061C/Z:u\u0007\"\fgnZ3m_\u001e\u001cv.\u001e:dK^KG\u000f[#wK:$8\u000fR;qY&\u001c\u0017\r^3)\u0005m9\u0016A\u0006;fgR\u001c6-\u00198P]V\u00038/\u001a:u'>,(oY3)\u0005q9\u0016A\f;fgR,\u0006o]3siN{WO]2f/&$\bnQ8naV$X\rZ\"pYVlg.\u00118e/\u0006$XM]7be.D#!H,\u0002KQ,7\u000f^+qg\u0016\u0014HoU8ve\u000e,w+\u001b;i/\u0006$XM]7be.\u0004Vo\u001d5E_^t\u0007F\u0001\u0010X\u0003\r\"Xm\u001d;V]&|g.\u00169tKJ$8k\\;sG\u0016\fe\u000eZ!hOJ,w-\u0019;j_:D#aH,\u00027Q,7\u000f^!hOJ,w-\u0019;f\u001f:,\u0006o]3siN{WO]2fQ\t\u0001s+A\u0013uKN$\u0018iZ4sK\u001e\fG/Z(o+B\u001cXM\u001d;T_V\u00148-\u001a)sS6\f'/_&fs\"\u0012\u0011eV\u0001'i\u0016\u001cH\u000f\u0015:pGRKW.\u001a+f[B|'/\u00197K_&twJ\\+qg\u0016\u0014HoU8ve\u000e,\u0007F\u0001\u0012X\u0003\u001d\"Xm\u001d;Fm\u0016tG\u000fV5nKR+W\u000e]8sC2Tu.\u001b8P]V\u00038/\u001a:u'>,(oY3)\u0005\r:\u0016\u0001\n;fgR<\u0016N\u001c3po\u0006;wM]3hCR,wJ\\\"iC:<W\r\\8h'>,(oY3)\u0005\u0011:\u0016A\b;fgRLeN^1mS\u0012\u001cv.\u001e:dK\u000eC\u0017M\\4fY><Wj\u001c3fQ\t)s+\u0001\u0013uKN$X*[:tS:<\u0007K]5nCJL8*Z=G_J,\u0006o]3siN{WO]2fQ\t1s+A\u0014uKN$X*[:tS:<\u0007K]5nCJL8*Z=G_J,e/\u001a8ug\u0012+\b\u000f\\5dCR,\u0007FA\u0014X\u0003u!Xm\u001d;J]Z\fG.\u001b3TG\u0006twJ\u001c'p_.,\boU8ve\u000e,\u0007F\u0001\u0015X\u0003y!Xm\u001d;J]Z\fG.\u001b3XCR,'/\\1sW>+H\u000f];u)f\u0004X\r\u000b\u0002*/\u0006YB/Z:u'\u0016$\b+\u0019:bY2,G.[:n\r>\u00148k\\;sG\u0016D#AK,")
public class TableScanTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testDataStreamScan() {
        this.util().addDataStream("DataStreamTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$1 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.TableScanTest$$anon$1 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().verifyExecPlan("SELECT * FROM DataStreamTable");
    }

    @Test
    public void testDDLTableScan() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n                    |) WITH (\n                    |  'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testDDLWithComputedColumn() {
        this.util().addTemporarySystemFunction("my_udf", (UserDefinedFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as a + 1,\n                     |  d as to_timestamp(b),\n                     |  e as my_udf(a)\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithRowTypeComputedColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as row(a, b)\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithMetadataColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA,\n         |  `computed` AS UPPER(`metadata_1`)\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM MetadataTable");
    }

    @Test
    public void testDDLWithMetadataThatConflictsWithPhysicalColumn() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE MetadataTable (\n                     |  `timestamp` TIMESTAMP(9),\n                     |  `metadata_timestamp` TIMESTAMP(0) METADATA FROM 'timestamp',\n                     |  `other` STRING METADATA,\n                     |  `computed_other` AS UPPER(`other`),\n                     |  `computed_timestamp` AS CAST(`metadata_timestamp` AS STRING)\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'false',\n                     |  'readable-metadata' = 'timestamp:TIMESTAMP(0), other:STRING'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM MetadataTable");
    }

    @Test
    public void testDDLWithMetadataColumnProjectionPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE MetadataTable (\n         |  `a` INT,\n         |  `other_metadata` INT METADATA FROM 'metadata_3' VIRTUAL,\n         |  `b` BIGINT,\n         |  `c` INT,\n         |  `metadata_1` STRING METADATA\n         |) WITH (\n         |  'connector' = 'values',\n         |  'bounded' = 'false',\n         |  'readable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN, metadata_3:BIGINT',\n         |  'writable-metadata' = 'metadata_1:STRING, metadata_2:BOOLEAN'\n         |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT `b`, `other_metadata` FROM MetadataTable");
    }

    @Test
    public void testDDLWithWatermarkComputedColumn() {
        this.util().addTemporarySystemFunction("my_udf", (UserDefinedFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  c as a + 1,\n                     |  d as to_timestamp(b),\n                     |  e as my_udf(a),\n                     |  WATERMARK FOR d AS d - INTERVAL '0.001' SECOND\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testDDLWithComputedColumnReferRowtime() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  my_ts AS ts - INTERVAL '0.001' SECOND,\n                    |  proc AS PROCTIME(),\n                    |  WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND\n                    |) WITH (\n                    |  'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM src WHERE a > 1");
    }

    @Test
    public void testDDLWithMultipleColumnsFromSameMetadataKey() {
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                                        |CREATE TABLE source (\n                                                        |  a INT METADATA,\n                                                        |  b INT METADATA FROM 'a'\n                                                        |) WITH (\n                                                        |  'connector' = 'COLLECTION'\n                                                        |)\n                                                        |")).stripMargin())).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ValidationException.class, (String)"The column `a` and `b` in the table are both from the same metadata key 'a'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.")});
    }

    @Test
    public void testDDLWithMultipleColumnsFromSameMetadataKey2() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE source (\n                               |  a INT METADATA\n                               |) WITH (\n                               |  'connector' = 'COLLECTION'\n                               |)\n                               |")).stripMargin());
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n            |CREATE TABLE like_source (\n            |  b INT METADATA FROM 'a'\n            |)\n            |WITH (\n            |  'connector' = 'COLLECTION'\n            |) LIKE source (\n            |  INCLUDING METADATA\n            |)\n            |")).stripMargin())).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)"The column `a` and `b` in the table are both from the same metadata key 'a'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.")});
    }

    @Test
    public void testKeywordsWithWatermarkComputedColumn() {
        this.util().addTemporarySystemFunction("my_udf", (UserDefinedFunction)Func0$.MODULE$);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table t1(\n                     |  a int,\n                     |  b varchar,\n                     |  `time` time,\n                     |  mytime as `time`,\n                     |  `current_time` as current_time,\n                     |  json_row ROW<`timestamp` TIMESTAMP(3)>,\n                     |  `timestamp` AS json_row.`timestamp`,\n                     |  WATERMARK FOR `timestamp` AS `timestamp`\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().verifyExecPlan("SELECT * FROM t1");
    }

    @Test
    public void testScanOnBoundedSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testFilterOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b,a,ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionChangelogSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE changelog_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE append_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n                  |SELECT b, ts, a\n                  |FROM (\n                  |  SELECT * FROM changelog_src\n                  |  UNION ALL\n                  |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n                  |)\n                  |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT COUNT(*) FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testJoinOnChangelogSource() {
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.verifyJoinOnSource("I,UB,UA");
    }

    @Test
    public void testJoinOnNoUpdateSource() {
        this.verifyJoinOnSource("I,D");
    }

    @Test
    public void testJoinOnUpsertSource() {
        this.verifyJoinOnSource("UA,D");
    }

    private void verifyJoinOnSource(String changelogMode) {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency_id BIGINT,\n                    |  currency_name STRING\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(397).append("\n                     |CREATE TABLE rates_history (\n                     |  currency_id BIGINT,\n                     |  currency_name STRING,\n                     |  rate BIGINT,\n                     |  PRIMARY KEY (currency_id) NOT ENFORCED\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'changelog-mode' = '").append(changelogMode).append("'\n                     |)\n      ").toString())).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency_name, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o JOIN rates_history AS r\n        |ON o.currency_id = r.currency_id AND o.currency_name = r.currency_name\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testWatermarkAndChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testChangelogSourceWithEventsDuplicate() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testScanOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  id1 STRING,\n                    |  a INT,\n                    |  id2 BIGINT,\n                    |  b DOUBLE,\n                    |  PRIMARY KEY (id2, id1) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id1, a, b FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithComputedColumnAndWatermark() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, b, c FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUpsertSourceWithWatermarkPushDown() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id STRING,\n                    |  a INT,\n                    |  b AS a + 1,\n                    |  c STRING,\n                    |  ts as to_timestamp(c),\n                    |  PRIMARY KEY (id) NOT ENFORCED,\n                    |  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'enable-watermark-push-down' = 'true',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT id, ts FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnionUpsertSourceAndAggregation() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE upsert_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE append_src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, ts, a\n        |FROM (\n        |  SELECT * FROM upsert_src\n        |  UNION ALL\n        |  SELECT MAX(ts) as t, a, MAX(b) as b FROM append_src GROUP BY a\n        |)\n        |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  c STRING,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT b, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY b", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testAggregateOnUpsertSourcePrimaryKey() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE,\n                    |  c STRING,\n                    |  PRIMARY KEY (a) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D'\n                    |)\n      ")).stripMargin());
        this.util().verifyRelPlan("SELECT a, COUNT(*), MAX(ts), MIN(ts) FROM src GROUP BY a", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testProcTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency STRING,\n                    |  proctime AS PROCTIME()\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE rates_history (\n                    |  currency STRING PRIMARY KEY NOT ENFORCED,\n                    |  rate BIGINT\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.proctime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testEventTimeTemporalJoinOnUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE orders (\n                    |  amount BIGINT,\n                    |  currency STRING,\n                    |  rowtime TIMESTAMP(3),\n                    |  WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I'\n                    |)\n                    |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE rates_history (\n                    |  currency STRING PRIMARY KEY NOT ENFORCED,\n                    |  rate BIGINT,\n                    |  rowtime TIMESTAMP(3),\n                    |  WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'UA,D',\n                    |  'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT o.currency, o.amount, r.rate, o.amount * r.rate\n        |FROM orders AS o LEFT JOIN rates_history FOR SYSTEM_TIME AS OF o.rowtime AS r\n        |ON o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testWindowAggregateOnChangelogSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts AS PROCTIME(),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,UB'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT TUMBLE_START(ts, INTERVAL '10' SECOND), COUNT(*)\n        |FROM src\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testInvalidSourceChangelogMode() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,D'\n                    |)\n      ")).stripMargin());
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Invalid source for table 'default_catalog.default_database.src'. A ScanTableSource doesn't support a changelog stream that contains UPDATE_BEFORE but no UPDATE_AFTER. Please adapt the implementation of class 'org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanLookupTableSource'.") instanceof ValidationException;
    }

    @Test
    public void testMissingPrimaryKeyForUpsertSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Table 'default_catalog.default_database.src' produces a changelog stream that contains UPDATE_AFTER but no UPDATE_BEFORE. This requires defining a primary key constraint on the table.") instanceof TableException;
    }

    @Test
    public void testMissingPrimaryKeyForEventsDuplicate() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  ts TIMESTAMP(3),\n                    |  a INT,\n                    |  b DOUBLE\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'changelog-mode' = 'I,UB,UA,D'\n                    |)\n      ")).stripMargin());
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src WHERE a > 1", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Configuration 'table.exec.source.cdc-events-duplicate' is enabled which requires the changelog sources to define a PRIMARY KEY. However, table 'default_catalog.default_database.src' doesn't have a primary key.") instanceof TableException;
    }

    @Test
    public void testInvalidScanOnLookupSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(306).append("\n                     |CREATE TABLE src (\n                     |  ts TIMESTAMP(3),\n                     |  a INT,\n                     |  b DOUBLE\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'table-source-class' = '").append(TestValuesTableFactory.MockedLookupTableSource.class.getName()).append("'\n                     |)\n      ").toString())).stripMargin());
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyRelPlan("SELECT * FROM src", (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}))).hasMessageContaining("Cannot generate a valid execution plan for the given query") instanceof ValidationException;
    }

    @Test
    public void testInvalidWatermarkOutputType() {
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                                             |CREATE TABLE src (\n                                             |  ts TIMESTAMP(3),\n                                             |  a INT,\n                                             |  b DOUBLE,\n                                             |  WATERMARK FOR `ts` AS ''\n                                             |) WITH (\n                                             |  'connector' = 'values'\n                                             |)\n      ")).stripMargin())).hasMessageContaining("Invalid data type of expression for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the watermark expression type is CHAR(0) NOT NULL") instanceof ValidationException;
    }

    @Test
    public void testSetParallelismForSource() {
        TableConfig config = TableConfig.getDefault();
        config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, (Object)BoxesRunTime.boxToInteger((int)10));
        StreamTableTestUtil util = this.streamTestUtil(config);
        util.addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE changelog_src (\n                    |  id INT,\n                    |  a STRING,\n                    |  PRIMARY KEY (id) NOT ENFORCED\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'true',\n                    |  'runtime-source' = 'DataStream',\n                    |  'scan.parallelism' = '5',\n                    |  'enable-projection-push-down' = 'false',\n                    |  'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        util.addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE src (\n                    |  id INT,\n                    |  b STRING,\n                    |  c INT\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'true',\n                    |  'runtime-source' = 'DataStream',\n                    |  'scan.parallelism' = '3',\n                    |  'enable-projection-push-down' = 'false'\n                    |)\n      ")).stripMargin());
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM src LEFT JOIN changelog_src\n        |ON src.id = changelog_src.id WHERE src.c > 1\n        |")).stripMargin();
        util.verifyExplain(query, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN}));
    }
}

