/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.api.typeutils.CaseClassTypeInfo;
import org.apache.flink.table.api.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.stream.sql.SubplanReuseTest$;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.runtime.functions.aggregate.FirstValueAggFunction;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005%b\u0001\u0002\u0012$\u0001QBQa\u000f\u0001\u0005\u0002qBqa\u0010\u0001C\u0002\u0013%\u0001\t\u0003\u0004E\u0001\u0001\u0006I!\u0011\u0005\u0006\u000b\u0002!\tA\u0012\u0005\u00061\u0002!\tA\u0012\u0005\u0006;\u0002!\tA\u0012\u0005\u0006?\u0002!\tA\u0012\u0005\u0006C\u0002!\tA\u0012\u0005\u0006G\u0002!\tA\u0012\u0005\u0006K\u0002!\tA\u0012\u0005\u0006O\u0002!\tA\u0012\u0005\u0006S\u0002!\tA\u0012\u0005\u0006W\u0002!\tA\u0012\u0005\u0006[\u0002!\tA\u0012\u0005\u0006_\u0002!\tA\u0012\u0005\u0006c\u0002!\tA\u0012\u0005\u0006g\u0002!\tA\u0012\u0005\u0006k\u0002!\tA\u0012\u0005\u0006o\u0002!\tA\u0012\u0005\u0006s\u0002!\tA\u0012\u0005\u0006w\u0002!\tA\u0012\u0005\u0006{\u0002!\tA\u0012\u0005\u0006\u007f\u0002!\tA\u0012\u0005\u0007\u0003\u0007\u0001A\u0011\u0001$\t\r\u0005\u001d\u0001\u0001\"\u0001G\u0011\u0019\tY\u0001\u0001C\u0001\r\"1\u0011q\u0002\u0001\u0005\u0002\u0019Ca!a\u0005\u0001\t\u00031\u0005BBA\f\u0001\u0011\u0005a\t\u0003\u0004\u0002\u001c\u0001!\tA\u0012\u0005\u0007\u0003?\u0001A\u0011\u0001$\t\r\u0005\r\u0002\u0001\"\u0001G\u0011\u0019\t9\u0003\u0001C\u0005\r\n\u00012+\u001e2qY\u0006t'+Z;tKR+7\u000f\u001e\u0006\u0003I\u0015\n1a]9m\u0015\t1s%\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003Q%\nA\u0001\u001d7b]*\u0011!fK\u0001\ba2\fgN\\3s\u0015\taS&A\u0003uC\ndWM\u0003\u0002/_\u0005)a\r\\5oW*\u0011\u0001'M\u0001\u0007CB\f7\r[3\u000b\u0003I\n1a\u001c:h\u0007\u0001\u0019\"\u0001A\u001b\u0011\u0005YJT\"A\u001c\u000b\u0005aJ\u0013!B;uS2\u001c\u0018B\u0001\u001e8\u00055!\u0016M\u00197f)\u0016\u001cHOQ1tK\u00061A(\u001b8jiz\"\u0012!\u0010\t\u0003}\u0001i\u0011aI\u0001\u0005kRLG.F\u0001B!\t1$)\u0003\u0002Do\t\u00192\u000b\u001e:fC6$\u0016M\u00197f)\u0016\u001cH/\u0016;jY\u0006)Q\u000f^5mA\u00051!-\u001a4pe\u0016$\u0012a\u0012\t\u0003\u0011.k\u0011!\u0013\u0006\u0002\u0015\u0006)1oY1mC&\u0011A*\u0013\u0002\u0005+:LG\u000f\u000b\u0002\u0005\u001dB\u0011qJV\u0007\u0002!*\u0011\u0011KU\u0001\u0004CBL'BA*U\u0003\u001dQW\u000f]5uKJT!!V\u0019\u0002\u000b),h.\u001b;\n\u0005]\u0003&A\u0003\"fM>\u0014X-R1dQ\u00069B/Z:u\t&\u001c\u0018M\u00197f'V\u0014\u0007\u000f\\1o%\u0016,8/\u001a\u0015\u0003\u000bi\u0003\"aT.\n\u0005q\u0003&\u0001\u0002+fgR\fA\u0005^3tiN+(\r\u001d7b]J+Wo]3XSRDG)\u001b4gKJ,g\u000e\u001e*poRK\b/\u001a\u0015\u0003\ri\u000b!\u0004^3ti\u0016s\u0017M\u00197f%\u0016,8/\u001a+bE2,7k\\;sG\u0016D#a\u0002.\u00027Q,7\u000f\u001e#jg\u0006\u0014G.\u001a*fkN,G+\u00192mKN{WO]2fQ\tA!,\u0001\fuKN$8+\u001e2qY\u0006t'+Z;tK>s7)\u00197dQ\tI!,A\u0019uKN$8+\u001e2qY\u0006t'+Z;tK>s7)\u00197d/&$\bNT8o\t\u0016$XM]7j]&\u001cH/[2Qe>TWm\u0019;)\u0005)Q\u0016!\f;fgR\u001cVO\u00199mC:\u0014V-^:f\u001f:\u001c\u0015\r\\2XSRDgj\u001c8EKR,'/\\5oSN$\u0018nY+eM\"\u00121BW\u0001\u001bi\u0016\u001cHoU;ca2\fgNU3vg\u0016|e.\u0012=dQ\u0006tw-\u001a\u0015\u0003\u0019i\u000b\u0001\u0005^3tiN+(\r\u001d7b]J+Wo]3P]\u001e\u0013x.\u001e9BO\u001e\u0014XmZ1uK\"\u0012QBW\u00017i\u0016\u001cHoU;ca2\fgNU3vg\u0016|e.Q4he\u0016<\u0017\r^3XSRDgj\u001c8EKR,'/\\5oSN$\u0018nY!hO\u000e\u000bG\u000e\u001c\u0015\u0003\u001di\u000ba\u0003^3tiN+(\r\u001d7b]J+Wo]3P]N{'\u000f\u001e\u0015\u0003\u001fi\u000bq\u0003^3tiN+(\r\u001d7b]J+Wo]3P]2KW.\u001b;)\u0005AQ\u0016a\u0007;fgR\u001cVO\u00199mC:\u0014V-^:f\u001f:\u001cvN\u001d;MS6LG\u000f\u000b\u0002\u00125\u00061B/Z:u'V\u0014\u0007\u000f\\1o%\u0016,8/Z(o\u0015>Lg\u000e\u000b\u0002\u00135\u0006\u0019D/Z:u'V\u0014\u0007\u000f\\1o%\u0016,8/Z(o\u0015>LgNT8o\t\u0016$XM]7j]&\u001cH/[2K_&t7i\u001c8eSRLwN\u001c\u0015\u0003'i\u000bA\u0004^3tiN+(\r\u001d7b]J+Wo]3P]>3XM],j]\u0012|w\u000f\u000b\u0002\u00155\u00069D/Z:u'V\u0014\u0007\u000f\\1o%\u0016,8/Z(o\u001fZ,'oV5oI><x+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e\fumZ\"bY2D#!\u0006.\u00027Q,7\u000f^*vEBd\u0017M\u001c*fkN,wJ\\\"peJ,G.\u0019;fQ\t1\",A\u001auKN$8+\u001e2qY\u0006t'+Z;tK>s7i\u001c:sK2\fG/Z,ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7-\u0016#U\r\"\u0012qCW\u0001$i\u0016\u001cHoU;ca2\fgNU3vg\u0016<\u0016\u000e\u001e5Es:\fW.[2Gk:\u001cG/[8oQ\tA\",A\u0013uKN$XI\\1cY\u0016\u0014V-^:f)\u0006\u0014G.Z*pkJ\u001cWm\u00148OK^\u001cv.\u001e:dK\"\u0012\u0011DW\u0001'i\u0016\u001cH\u000fR5tC\ndWMU3vg\u0016$\u0016M\u00197f'>,(oY3P]:+woU8ve\u000e,\u0007F\u0001\u000e[\u0003Y\"Xm\u001d;SKV\u001cX\rV1cY\u0016\u001cv.\u001e:dK^KG\u000f\u001b)s_*,7\r\u001e)vg\"$un\u001e8B]\u0012lU\r^1ECR\f7*Z=2Q\tY\",A\u001buKN$(+Z;tKR\u000b'\r\\3T_V\u00148-Z,ji\"\u0004&o\u001c6fGR\u0004Vo\u001d5E_^t\u0017I\u001c3NKR\fG)\u0019;b\u0017\u0016L\bF\u0001\u000f[\u0003Y\"Xm\u001d;T_V\u00148-\u001a*fkN,w+\u001b;i\u000b6\u0004H/\u001f$jYR,'oQ8oI\u0006sG-S4o_J,W)\u001c9us\u001aKG\u000e^3sQ\ti\",\u0001\u001euKN$8k\\;sG\u0016\u0014V-^:f/&$\b.R7qif4\u0015\u000e\u001c;fe\u000e{g\u000eZ!oI&;gn\u001c:f\u000b6\u0004H/\u001f$jYR,'\u000f\u0016:vK\"\u0012aDW\u0001<i\u0016\u001cHoU8ve\u000e,'+Z;tK^KG\u000f[#naRLh)\u001b7uKJ\u001cuN\u001c3B]\u0012LuM\\8sK\u0016k\u0007\u000f^=GS2$XM\u001d+sk\u0016\u0014\u0004FA\u0010[\u0003m\"Xm\u001d;T_V\u00148-\u001a*fkN,w+\u001b;i\u000b6\u0004H/\u001f$jYR,'oQ8oI\u0006sG-S4o_J,W)\u001c9us\u001aKG\u000e^3s)J,Xm\r\u0015\u0003Ai\u000bA\u0003^3tiJ+Wo]3P]:+woU8ve\u000e,\u0007")
public class SubplanReuseTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @BeforeEach
    public void before() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        this.util().addTableSource("x", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$1 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.SubplanReuseTest$$anon$1 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().addTableSource("y", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "d")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "e")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "f"))}), new CaseClassTypeInfo<Tuple3<Object, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.SubplanReuseTest$$anon$3 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
    }

    @Test
    public void testDisableSubplanReuse() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (\n        | SELECT a, SUM(b) as b, SUM(e) as e FROM x, y WHERE a = d AND c > 100 GROUP BY a\n        |)\n        |SELECT r1.a, r1.b, r2.e FROM r r1, r r2 WHERE r1.b > 10 AND r2.e < 20 AND r1.a = r2.a\n      ")).stripMargin();
        this.util().verifyRelPlanNotExpected(sqlQuery, (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reused"}));
    }

    @Test
    public void testSubplanReuseWithDifferentRowType() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t1 AS (SELECT CAST(a as BIGINT) AS a, SUM(b) AS b FROM x GROUP BY CAST(a as BIGINT)),\n        |     t2 AS (SELECT CAST(a as DOUBLE) AS a, SUM(b) AS b FROM x GROUP BY CAST(a as DOUBLE))\n        |SELECT t1.*, t2.* FROM t1, t2 WHERE t1.b = t2.b\n      ")).stripMargin();
        this.util().verifyRelPlanNotExpected(sqlQuery, (Seq<String>)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Reused"}));
    }

    @Test
    public void testEnableReuseTableSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t AS (SELECT x.a AS a, x.b AS b, y.d AS d, y.e AS e FROM x, y WHERE x.a = y.d)\n        |SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testDisableReuseTableSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t AS (SELECT * FROM x, y WHERE x.a = y.d)\n        |SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCalc() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c FROM x WHERE c LIKE 'test%')\n        |(SELECT r.a, LOWER(c) AS c, y.e FROM r, y WHERE r.a = y.d)\n        |UNION ALL\n        |(SELECT r.a, LOWER(c) AS c, y.e FROM r, y WHERE r.a = y.d)\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCalcWithNonDeterministicProject() {
        this.util().addTemporarySystemFunction("random_udf", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |(SELECT a, random_udf() FROM x WHERE a > 10)\n        |UNION ALL\n        |(SELECT a, random_udf() FROM x WHERE a > 10)\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCalcWithNonDeterministicUdf() {
        this.util().addTemporarySystemFunction("random_udf", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |(SELECT a FROM x WHERE b > random_udf(a))\n        |UNION ALL\n        |(SELECT a FROM x WHERE b > random_udf(a))\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnExchange() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c FROM x WHERE c LIKE 'test%')\n        |SELECT * FROM r, y WHERE a = d AND e > 10\n        |UNION ALL\n        |SELECT * FROM r, y WHERE a = d AND f <> ''\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnGroupAggregate() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.b AND r2.a > 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnAggregateWithNonDeterministicAggCall() {
        this.util().addTemporarySystemFunction("MyFirst", (UserDefinedFunction)new FirstValueAggFunction(DataTypes.INT().getLogicalType()));
        this.util().addTemporarySystemFunction("MyLast", (UserDefinedFunction)new FirstValueAggFunction(DataTypes.BIGINT().getLogicalType()));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, MyFirst(a) a, MyLast(b) b FROM x GROUP BY c)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.b AND r2.a > 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnSort() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c ORDER BY a, b DESC)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.a AND r1.a > 1 AND r2.b < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnLimit() {
        this.util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_DISABLED_OPERATORS, (Object)"HashJoin,SortMergeJoin");
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b FROM x LIMIT 10)\n        |\n        |SELECT a, b FROM r WHERE a > 10\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r WHERE b < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnSortLimit() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT c, SUM(a) a, SUM(b) b FROM x GROUP BY c ORDER BY a, b DESC LIMIT 10)\n        |\n        |SELECT a, b FROM r WHERE a > 10\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r WHERE b < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT * FROM x FULL OUTER JOIN y ON ABS(a) = ABS(d) OR c = f\n        |           WHERE b > 1 and e < 2)\n        |\n        |SELECT a, b FROM r\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnJoinNonDeterministicJoinCondition() {
        this.util().addTemporarySystemFunction("random_udf", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.NonDeterministicUdf());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT * FROM x FULL OUTER JOIN y ON random_udf(a) = random_udf(d) OR c = f\n        |           WHERE b > 1 and e < 2)\n        |\n        |SELECT a, b FROM r\n        |UNION ALL\n        |SELECT a, b * 2 AS b FROM r\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnOverWindow() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, RANK() OVER (ORDER BY c DESC) FROM x)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.a AND r1.b < 100 AND r2.b > 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnOverWindowWithNonDeterministicAggCall() {
        this.util().addTemporarySystemFunction("MyFirst", (UserDefinedFunction)new FirstValueAggFunction(DataTypes.STRING().getLogicalType()));
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, MyFirst(c) OVER (PARTITION BY c ORDER BY c DESC) FROM x)\n        |SELECT * FROM r r1, r r2 WHERE r1.a = r2.a AND r1.b < 100 AND r2.b > 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCorrelate() {
        this.util().addTemporarySystemFunction("str_split", (UserDefinedFunction)new JavaUserDefinedTableFunctions.StringSplit());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-')) AS T(v))\n        |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseOnCorrelateWithNonDeterministicUDTF() {
        this.util().addTemporarySystemFunction("TableFun", (UserDefinedFunction)new JavaUserDefinedTableFunctions.NonDeterministicTableFunc());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH r AS (SELECT a, b, c, s FROM x, LATERAL TABLE(TableFun(c)) AS T(s))\n        |SELECT * FROM r r1, r r2 WHERE r1.c = r2.s\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSubplanReuseWithDynamicFunction() {
        Table sqlQuery = this.util().tableEnv().sqlQuery(new StringOps(Predef$.MODULE$.augmentString("\n                                            |(SELECT a AS random FROM x ORDER BY rand() LIMIT 1)\n                                            |INTERSECT\n                                            |(SELECT a AS random FROM x ORDER BY rand() LIMIT 1)\n                                            |INTERSECT\n                                            |(SELECT a AS random FROM x ORDER BY rand() LIMIT 1)\n      ")).stripMargin());
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEnableReuseTableSourceOnNewSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.testReuseOnNewSource();
    }

    @Test
    public void testDisableReuseTableSourceOnNewSource() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)false));
        this.testReuseOnNewSource();
    }

    @Test
    public void testReuseTableSourceWithProjectPushDownAndMetaDataKey1() {
        String ddl1 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE reuseTable (\n         |  a bigint,\n         |  b varchar(64),\n         |  c bigint,\n         |  d STRING,\n         |  ts1 TIMESTAMP(3) METADATA,\n         |  ts2 TIMESTAMP(3) METADATA\n         | ) WITH (\n         |  'connector' = 'values',\n         |  'readable-metadata' = 'ts1:TIMESTAMP(3), ts2:TIMESTAMP(3)'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl1);
        String ddl2 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink1 (\n         |  a1 bigint,\n         |  b1 VARCHAR(32),\n         |  my_time1 timestamp,\n         |  d1 DECIMAL(20,2)\n         | ) WITH (\n         |  'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl2);
        String ddl3 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink2 (\n         |  a2 bigint,\n         | `update_time` timestamp\n         | ) WITH (\n         | 'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl3);
        String query1 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, b, ts1, CAST(d AS DECIMAL(28,2)) AS d1\n         | FROM reuseTable\n         |")).stripMargin();
        Table table = this.util().tableEnv().sqlQuery(query1);
        this.util().tableEnv().createTemporaryView("view1", table);
        String query2 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, ts1 AS update_time\n         | FROM reuseTable\n         |")).stripMargin();
        Table table2 = this.util().tableEnv().sqlQuery(query2);
        this.util().tableEnv().createTemporaryView("view2", table2);
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO sink1 SELECT a, b, ts1, d1 FROM view1");
        stmtSet.addInsertSql("INSERT INTO sink2 SELECT a, update_time FROM view2");
        this.util().verifyExecPlan(stmtSet);
    }

    @Test
    public void testReuseTableSourceWithProjectPushDownAndMetaDataKey() {
        String ddl1 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE reuseTable (\n         |  a bigint,\n         |  b varchar(64),\n         |  c bigint,\n         |  d STRING,\n         |  my_time TIMESTAMP(3) METADATA FROM 'ts1',\n         |  unUse_time TIMESTAMP(3) METADATA FROM 'ts2'\n         | ) WITH (\n         |  'connector' = 'values',\n         |  'readable-metadata' = 'ts1:TIMESTAMP(3), ts2:TIMESTAMP(3)'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl1);
        String ddl2 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink1 (\n         |  a1 bigint,\n         |  b1 VARCHAR(32),\n         |  my_time1 timestamp,\n         |  d1 DECIMAL(20,2)\n         | ) WITH (\n         |  'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl2);
        String ddl3 = new StringOps(Predef$.MODULE$.augmentString("\n         | CREATE TABLE sink2 (\n         |  a2 bigint,\n         | `update_time` timestamp\n         | ) WITH (\n         | 'connector' = 'values'\n         | )\n         |")).stripMargin();
        this.util().tableEnv().executeSql(ddl3);
        String query1 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, b, my_time, CAST(d AS DECIMAL(28,2)) AS d1\n         | FROM reuseTable\n         |")).stripMargin();
        Table table = this.util().tableEnv().sqlQuery(query1);
        this.util().tableEnv().createTemporaryView("view1", table);
        String query2 = new StringOps(Predef$.MODULE$.augmentString("\n         | SELECT a, my_time AS update_time\n         | FROM reuseTable\n         |")).stripMargin();
        Table table2 = this.util().tableEnv().sqlQuery(query2);
        this.util().tableEnv().createTemporaryView("view2", table2);
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql("INSERT INTO sink1 SELECT a, b, my_time, d1 FROM view1");
        stmtSet.addInsertSql("INSERT INTO sink2 SELECT a, update_time FROM view2");
        this.util().verifyExecPlan(stmtSet);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilter() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T1.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilterTrue() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T1.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilterTrue2() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T2.a < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testSourceReuseWithEmptyFilterCondAndIgnoreEmptyFilterTrue3() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table MyTable(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values',\n                     |  'bounded' = 'true'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT * FROM MyTable T1, MyTable T2 WHERE T1.a = T2.a AND T1.a > 10 AND T2.a < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    private void testReuseOnNewSource() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table newX(\n                     |  a int,\n                     |  b bigint,\n                     |  c varchar\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |create table newY(\n                     |  d int,\n                     |  e bigint,\n                     |  f varchar\n                     |) with (\n                     |  'connector' = 'values'\n                     |)\n       ")).stripMargin());
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH t AS (\n        |  SELECT newX.a AS a, newX.b AS b, newY.d AS d, newY.e AS e\n        |  FROM newX, newY\n        |  WHERE newX.a = newY.d)\n        |SELECT t1.*, t2.* FROM t t1, t t2 WHERE t1.b = t2.e AND t1.a < 10 AND t2.a > 5\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }
}

