/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.api.typeutils.CaseClassTypeInfo;
import org.apache.flink.table.api.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.stream.sql.RankTest$;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005\re\u0001\u0002\u001a4\u0001\u0011CQa\u0013\u0001\u0005\u00021Cqa\u0014\u0001C\u0002\u0013%\u0001\u000b\u0003\u0004U\u0001\u0001\u0006I!\u0015\u0005\u0006+\u0002!\tA\u0016\u0005\u0006Q\u0002!\tA\u0016\u0005\u0006U\u0002!\tA\u0016\u0005\u0006Y\u0002!\tA\u0016\u0005\u0006]\u0002!\tA\u0016\u0005\u0006a\u0002!\tA\u0016\u0005\u0006e\u0002!\tA\u0016\u0005\u0006i\u0002!\tA\u0016\u0005\u0006m\u0002!\tA\u0016\u0005\u0006q\u0002!\tA\u0016\u0005\u0006u\u0002!\tA\u0016\u0005\u0006y\u0002!\tA\u0016\u0005\u0006}\u0002!\tA\u0016\u0005\u0007\u0003\u0003\u0001A\u0011\u0001,\t\r\u0005\u0015\u0001\u0001\"\u0001W\u0011\u0019\tI\u0001\u0001C\u0001-\"1\u0011Q\u0002\u0001\u0005\u0002YCa!!\u0005\u0001\t\u00031\u0006BBA\u000b\u0001\u0011\u0005a\u000b\u0003\u0004\u0002\u001a\u0001!\tA\u0016\u0005\u0007\u0003;\u0001A\u0011\u0001,\t\r\u0005\u0005\u0002\u0001\"\u0001W\u0011\u0019\t)\u0003\u0001C\u0001-\"1\u0011\u0011\u0006\u0001\u0005\u0002YCa!!\f\u0001\t\u00031\u0006BBA\u0018\u0001\u0011\u0005a\u000b\u0003\u0004\u00024\u0001!\tA\u0016\u0005\u0007\u0003o\u0001A\u0011\u0001,\t\r\u0005m\u0002\u0001\"\u0001W\u0011\u0019\ty\u0004\u0001C\u0001-\"1\u00111\t\u0001\u0005\u0002YCa!a\u0012\u0001\t\u00031\u0006BBA&\u0001\u0011\u0005a\u000b\u0003\u0004\u0002P\u0001!\tA\u0016\u0005\u0007\u0003'\u0002A\u0011\u0001,\t\r\u0005]\u0003\u0001\"\u0001W\u0011\u0019\tY\u0006\u0001C\u0001-\"1\u0011q\f\u0001\u0005\u0002YCa!a\u0019\u0001\t\u00031\u0006BBA4\u0001\u0011\u0005a\u000b\u0003\u0004\u0002l\u0001!\tA\u0016\u0005\u0007\u0003_\u0002A\u0011\u0001,\t\r\u0005M\u0004\u0001\"\u0001W\u0011\u0019\t9\b\u0001C\u0001-\"1\u00111\u0010\u0001\u0005\u0002YCa!a \u0001\t\u00031&\u0001\u0003*b].$Vm\u001d;\u000b\u0005Q*\u0014aA:rY*\u0011agN\u0001\u0007gR\u0014X-Y7\u000b\u0005aJ\u0014\u0001\u00029mC:T!AO\u001e\u0002\u000fAd\u0017M\u001c8fe*\u0011A(P\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003}}\nQA\u001a7j].T!\u0001Q!\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0011\u0015aA8sO\u000e\u00011C\u0001\u0001F!\t1\u0015*D\u0001H\u0015\tA\u0015(A\u0003vi&d7/\u0003\u0002K\u000f\niA+\u00192mKR+7\u000f\u001e\"bg\u0016\fa\u0001P5oSRtD#A'\u0011\u00059\u0003Q\"A\u001a\u0002\tU$\u0018\u000e\\\u000b\u0002#B\u0011aIU\u0005\u0003'\u001e\u00131c\u0015;sK\u0006lG+\u00192mKR+7\u000f^+uS2\fQ!\u001e;jY\u0002\n\u0001\u0004^3tiJ\u000bgn[#oI6+8\u000f^*qK\u000eLg-[3e)\u00059\u0006C\u0001-\\\u001b\u0005I&\"\u0001.\u0002\u000bM\u001c\u0017\r\\1\n\u0005qK&\u0001B+oSRD#\u0001\u00020\u0011\u0005}3W\"\u00011\u000b\u0005\u0005\u0014\u0017aA1qS*\u00111\rZ\u0001\bUV\u0004\u0018\u000e^3s\u0015\t)\u0017)A\u0003kk:LG/\u0003\u0002hA\n!A+Z:u\u0003]!Xm\u001d;SC:\\WI\u001c3MKN\u001cH\u000b[1o5\u0016\u0014x\u000e\u000b\u0002\u0006=\u0006!B/Z:u%\u0006t7.\u00128e\u0019\u0016\u001c8\u000f\u00165b]FB#A\u00020\u00021Q,7\u000f\u001e*b].4UO\\2uS>t\u0017J\\'jI\u0012dW\r\u000b\u0002\b=\u0006\u0019D/Z:u%><h*^7cKJ<\u0016\u000e\u001e5SC:\\WI\u001c3MKN\u001cH\u000b[1oc=\u0013H-\u001a:CsB\u0013xn\u0019;j[\u0016\f5o\u0019\u0015\u0003\u0011y\u000bA\u0007^3tiJ{wOT;nE\u0016\u0014x+\u001b;i%\u0006t7.\u00128e\u0019\u0016\u001c8\u000f\u00165b]Fz%\u000fZ3s\u0005f\u0004&o\\2uS6,G)Z:dQ\tIa,\u0001\u001auKN$(k\\<Ok6\u0014WM],ji\"\u0014\u0016M\\6F]\u0012dUm]:UQ\u0006t\u0017g\u0014:eKJ\u0014\u0015PU8xi&lW-Q:dQ\tQa,A\u001auKN$(k\\<Ok6\u0014WM],ji\"\u0014\u0016M\\6F]\u0012dUm]:UQ\u0006t\u0017g\u0014:eKJ\u0014\u0015PU8xi&lW\rR3tG\"\u00121BX\u0001/i\u0016\u001cHOU1oW^KG\u000f\u001b*b].,e\u000e\u001a'fgN$\u0006.\u001982\u001fJ$WM\u001d\"z!J|7\r^5nK\u0006\u001b8\r\u000b\u0002\r=\u0006yC/Z:u%\u0006t7nV5uQJ\u000bgn[#oI2+7o\u001d+iC:\ftJ\u001d3fe\nK\bK]8di&lW\rR3tG\"\u0012QBX\u0001\u001ci\u0016\u001cHOU8x\u001dVl'-\u001a:XSRDw*\u001e;Pe\u0012,'OQ=)\u00059q\u0016A\u0006;fgR\u0014\u0016M\\6XSRDw*\u001e;Pe\u0012,'OQ=)\u0005=q\u0016a\u0007;fgR$UM\\:f%\u0006t7nV5uQ>+Ho\u0014:eKJ\u0014\u0015\u0010\u000b\u0002\u0011=\u0006aB/Z:u%><h*^7cKJ<\u0016\u000e\u001e5Nk2$\u0018n\u0012:pkB\u001c\bFA\t_\u0003]!Xm\u001d;SC:\\w+\u001b;i\u001bVdG/[$s_V\u00048\u000f\u000b\u0002\u0013=\u0006aB/Z:u\t\u0016t7/\u001a*b].<\u0016\u000e\u001e5Nk2$\u0018n\u0012:pkB\u001c\bFA\n_\u0003!!Xm\u001d;U_Bt\u0005F\u0001\u000b_\u0003%!Xm\u001d;U_Bt%\u0007\u000b\u0002\u0016=\u0006QA/Z:u)>\u0004h\n\u001e5)\u0005Yq\u0016A\u0005;fgR$v\u000e\u001d(XSRDg)\u001b7uKJD#a\u00060\u0002!Q,7\u000f\u001e+pa:\u000be\r^3s\u0003\u001e<\u0007F\u0001\r_\u0003Y!Xm\u001d;U_Btu+\u001b;i\u0017\u0016L8\t[1oO\u0016$\u0007FA\r_\u0003e!Xm\u001d;V]\u0006\u0014\u0018pU8siR{\u0007OT(o'R\u0014\u0018N\\4)\u0005iq\u0016\u0001\u0006;fgR$v\u000e\u001d(Pe\u0012,'OQ=D_VtG\u000f\u000b\u0002\u001c=\u0006YB/Z:u%><h*^7cKJ<\u0016\u000e\u001e5pkR|%\u000fZ3s\u0005f\f\u0001\u0005^3tiJ{wOT;nE\u0016\u0014x+\u001b;i\u001fJ$WM\u001d\"z\u0007>t7\u000f^1oi\"\u0012QDX\u0001\u001bi\u0016\u001cH\u000fV8q\u001d>\u0013H-\u001a:CsN+XnV5uQ\u000e{g\u000e\u001a\u0015\u0003=y\u000ba\u0004^3tiR{\u0007OT(sI\u0016\u0014()_*v[^KG\u000f[\"bg\u0016<\u0006.\u001a8)\u0005}q\u0016\u0001\u0007;fgR$v\u000e\u001d(Pe\u0012,'OQ=Tk6<\u0016\u000e\u001e5JM\"\u0012\u0001EX\u0001#i\u0016\u001cH\u000fV8q\u001d>\u0013H-\u001a:CsN+XnV5uQ\u001aKG\u000e^3s\u00072\fWo]3)\u0005\u0005r\u0016a\t;fgR$v\u000e\u001d(Pe\u0012,'OQ=Tk6<\u0016\u000e\u001e5GS2$XM]\"mCV\u001cXM\r\u0015\u0003Ey\u000b\u0011\u0005^3tiR{\u0007OT(sI\u0016\u0014()_\"pk:$\u0018I\u001c3Pi\",'OR5fY\u0012D#a\t0\u0002=Q,7\u000f\u001e+pa:;\u0016\u000e\u001e5He>,\bOQ=D_:\u001cH/\u00198u\u0017\u0016L\bF\u0001\u0013_\u00039!Xm\u001d;OKN$X\r\u001a+pa:C#!\n0\u0002/Q,7\u000f\u001e+pa:3uN\u001d,be&\f'\r\\3TSj,\u0007F\u0001\u0014_\u0003m!Xm\u001d;De\u0016\fG/\u001a,jK^<\u0016\u000e\u001e5S_^tU/\u001c2fe\"\u0012qEX\u0001\u0018i\u0016\u001cHoQ8se\u0016d\u0017\r^3T_J$Hk\u001c*b].D#\u0001\u000b0\u0002YQ,7\u000f^\"peJ,G.\u0019;f'>\u0014H\u000fV8SC:\\w+\u001b;i\u001bVdG/\u001b9mK\u001e\u0013x.\u001e9LKf\u001c\bFA\u0015_\u0003y!Xm\u001d;SC:\\w+\u001b;i\u0003:|G\u000f[3s%\u0006t7.Q:J]B,H\u000f\u000b\u0002+=\u0006\u0019C/Z:u%\u0016$WO\u001c3b]R\u0014\u0016M\\6Ok6\u0014WM]\"pYVlgNU3n_Z,\u0007FA\u0016_\u0003\u0001\"Xm\u001d;Va\u0012\fG/\u00192mKJ\u000bgn[,ji\"$U\rZ;qY&\u001c\u0017\r^3)\u00051r\u0016\u0001\t;fgR,\u0006\u000fZ1uC\ndWMU1oW\u00063G/\u001a:M_>\\W\u000f\u001d&pS:D#!\f0\u0002MQ,7\u000f^+qI\u0006$\u0018M\u00197f%\u0006t7.\u00114uKJLe\u000e^3s[\u0016$\u0017.\u0019;f'\u000e\fg\u000e\u000b\u0002/=\u0006)C/Z:u%\u0006t7nT;uaV$X\u000b]:feR\\U-\u001f(pi6\u000bGo\u00195TS:\\\u0007k\u001b\u0015\u0003_y\u000bq\u0004^3tiJ\u000bgn[(viB,H/\u00169tKJ$8*Z=J]NKgn\u001b)lQ\t\u0001d,A\u0013uKN$(+\u00198l\u001fV$\b/\u001e;M_N$X\u000b]:feR\\U-_,ji\"\u001c\u0016N\\6QW\"\u0012\u0011G\u0018")
public class RankTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testRankEndMustSpecified() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as rank_num\n        |  FROM MyTable)\n        |WHERE rank_num >= 10\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExecPlan(sql)).hasMessageContaining("Rank end is not specified.") instanceof TableException;
    }

    @Test
    public void testRankEndLessThanZero() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as rank_num\n        |  FROM MyTable)\n        |WHERE rank_num <= 0\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExecPlan(sql)).hasMessageContaining("Rank end should not less than zero") instanceof TableException;
    }

    @Test
    public void testRankEndLessThan1() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankFunctionInMiddle() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        | SELECT a, RANK() OVER (PARTITION BY a ORDER BY a) rk, b, c FROM MyTable) t\n        |WHERE rk < 10\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByProctimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByProctimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByRowtimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, rowtime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime ASC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithRankEndLessThan1OrderByRowtimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, rowtime,\n        |       ROW_NUMBER() OVER (PARTITION BY a ORDER BY rowtime DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankWithRankEndLessThan1OrderByProctimeAsc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       RANK() OVER (PARTITION BY a ORDER BY proctime ASC) as rk\n        |  FROM MyTable)\n        |WHERE rk <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRankWithRankEndLessThan1OrderByProctimeDesc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT a, b, c, proctime,\n        |       RANK() OVER (PARTITION BY a ORDER BY proctime DESC) as rk\n        |  FROM MyTable)\n        |WHERE rk <= 1\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowNumberWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, ROW_NUMBER() OVER (PARTITION BY b) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testRankWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, RANK() OVER (PARTITION BY b) as rk\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testDenseRankWithOutOrderBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, DENSE_RANK() OVER (PARTITION BY b) as rk\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testRowNumberWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, ROW_NUMBER() OVER (PARTITION BY b ORDER BY a) as row_num,\n        |         ROW_NUMBER() OVER (PARTITION BY a) as row_num1\n        |  FROM MyTable)\n        |WHERE row_num <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testRankWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, RANK() OVER (PARTITION BY b ORDER BY a) as rk,\n        |         RANK() OVER (PARTITION BY a) as rk1\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testDenseRankWithMultiGroups() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, DENSE_RANK() OVER (PARTITION BY b ORDER BY a) as rk,\n        |         DENSE_RANK() OVER (PARTITION BY a) as rk1\n        |  FROM MyTable)\n        |WHERE rk <= a\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testTopN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num <= 10\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopN2() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE 10 >= row_num\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNth() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable)\n        |WHERE row_num = 10\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithFilter() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT row_num, a, c\n        |FROM (\n        |  SELECT a, b, c,\n        |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) as row_num\n        |  FROM MyTable\n        |  WHERE c > 1000)\n        |WHERE row_num <= 10 AND b IS NOT NULL\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNAfterAgg() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithKeyChanged() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, last_value(b) as b, SUM(c) as sum_c\n        |FROM MyTable\n        |GROUP BY a\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testUnarySortTopNOnString() {
        this.util().addTableSource("T", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "category")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "shopId")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "price"))}), new CaseClassTypeInfo<Tuple3<String, Object, String>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<String, Object, String>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<String, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<String, Object, String>>(this, fieldSerializers){

                    public Tuple3<String, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)((String)fields[0]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$3 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT category, shopId, max_price,\n        |      ROW_NUMBER() OVER (PARTITION BY category ORDER BY max_price ASC) as row_num\n        |  FROM (\n        |     SELECT category, shopId, MAX(price) as max_price\n        |     FROM T\n        |     GROUP BY category, shopId\n        |  ))\n        |WHERE row_num <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNOrderByCount() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) as count_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY count_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        String sql2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(40).append("\n         |SELECT max(a) FROM (").append(sql).append(")\n       ").toString())).stripMargin();
        this.util().verifyRelPlan(sql2, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    public void testRowNumberWithoutOrderBy() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT ROW_NUMBER() over (partition by a) FROM MyTable\n      ")).stripMargin();
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(sqlQuery)).hasRootCauseMessage("Over Agg: The window rank function requires order by clause with non-constant fields. please re-check the over window statement.");
    }

    @Test
    public void testRowNumberWithOrderByConstant() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT a, b,\n        |  ROW_NUMBER() OVER (PARTITION BY b ORDER BY key) AS row_num\n        |  FROM (\n        |  SELECT *, '2023-03-29' AS key\n        |  FROM MyTable\n        |  ) tmp)\n        |WHERE row_num <= 10\n      ")).stripMargin();
        Assertions.assertThatThrownBy(() -> this.util().tableEnv().executeSql(sqlQuery)).hasRootCauseMessage("Over Agg: The window rank function requires order by clause with non-constant fields. please re-check the over window statement.");
    }

    @Test
    public void testTopNOrderBySumWithCond() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) AS sum_c\n        |FROM MyTable\n        |WHERE c >= 0\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNOrderBySumWithCaseWhen() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(CASE WHEN c > 10 THEN 1 WHEN c < 0 THEN 0 ELSE null END) AS sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithIf() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(IF(c > 10, 1, 0)) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithFilterClause() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) filter (where c >= 0 and a < 0) as sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(208).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderBySumWithFilterClause2() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, SUM(c) FILTER (WHERE c <= 0 AND a < 0) AS sum_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(207).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, sum_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY sum_c ASC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testTopNOrderByCountAndOtherField() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) AS count_c\n        |FROM MyTable\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(219).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY count_c DESC, a ASC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNWithGroupByConstantKey() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) AS count_c\n        |FROM (\n        |SELECT *, 'cn' AS cn\n        |FROM MyTable\n        |)\n        |GROUP BY a, b, cn\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testNestedTopN() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, COUNT(*) as count_c\n        |FROM (\n        |SELECT *, 'cn' as cn\n        |FROM MyTable\n        |)\n        |GROUP BY a, b, cn\n      ")).stripMargin();
        String subquery2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(212).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |      ROW_NUMBER() OVER (PARTITION BY a ORDER BY count_c DESC) AS row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= 10\n      ").toString())).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(197).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, count_c,\n         |    ROW_NUMBER() OVER (ORDER BY count_c DESC) as rank_num\n         |  FROM (").append(subquery2).append("))\n         |WHERE rank_num <= 10\n      ").toString())).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTopNForVariableSize() {
        String subquery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT a, b, add(max_c) as c\n        |FROM (\n        |  SELECT MAX(a) as a, b, MAX(c) as max_c\n        |  FROM MyTable\n        |  GROUP BY b\n        |)\n      ")).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(199).append("\n         |SELECT *\n         |FROM (\n         |  SELECT a, b, c,\n         |      ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) as row_num\n         |  FROM (").append(subquery).append("))\n         |WHERE row_num <= a\n      ").toString())).stripMargin();
        Assertions.assertThatExceptionOfType(ValidationException.class).isThrownBy(() -> this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE})));
    }

    @Test
    public void testCreateViewWithRowNumber() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE test_source (\n                    |  name STRING,\n                    |  eat STRING,\n                    |  age BIGINT\n                    |) WITH (\n                    |  'connector' = 'values',\n                    |  'bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().tableEnv().executeSql("create view view1 as select name, eat ,sum(age) as cnt\nfrom test_source group by name, eat");
        this.util().tableEnv().executeSql("create view view2 as\nselect *, ROW_NUMBER() OVER (PARTITION BY name ORDER BY cnt DESC) as row_num\nfrom view1");
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n         |create table sink (\n         |  name varchar,\n         |  eat varchar,\n         |  cnt bigint\n         |)\n         |with(\n         |  'connector' = 'print'\n         |)\n         |")).stripMargin());
        this.util().verifyExecPlanInsert("insert into sink select name, eat, cnt\nfrom view2 where row_num <= 3");
    }

    @Test
    public void testCorrelateSortToRank() {
        String query = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT a, b\n         |FROM\n         |  (SELECT DISTINCT a FROM MyTable) T1,\n         |  LATERAL (\n         |    SELECT b, c\n         |    FROM MyTable\n         |    WHERE a = T1.a\n         |    ORDER BY c\n         |    DESC LIMIT 3\n         |  )\n      ")).stripMargin();
        this.util().verifyExecPlan(query);
    }

    @Test
    public void testCorrelateSortToRankWithMultipleGroupKeys() {
        this.util().addDataStream("T", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "d")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple4<Object, String, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<Object, String, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>>(this, fieldSerializers){

                    public Tuple4<Object, String, Object, Object> createInstance(Object[] fields) {
                        return new Tuple4((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[3])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$5 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        String query = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT a, b, c\n         |FROM\n         |  (SELECT DISTINCT a, b FROM T) T1,\n         |  LATERAL (\n         |    SELECT c, d\n         |    FROM T\n         |    WHERE a = T1.a and b = T1.b\n         |    ORDER BY d\n         |    DESC LIMIT 3\n         |  )\n      ")).stripMargin();
        this.util().verifyExecPlan(query);
    }

    @Test
    public void testRankWithAnotherRankAsInput() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT CAST(rna AS INT) AS rn1, CAST(rnb AS INT) AS rn2 FROM (\n        |  SELECT *, row_number() over (partition by a order by b desc) AS rnb\n        |  FROM (\n        |    SELECT *, row_number() over (partition by a, c order by b desc) AS rna\n        |    FROM MyTable\n        |  )\n        |  WHERE rna <= 100\n        |)\n        |WHERE rnb <= 200\n        |")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRedundantRankNumberColumnRemove() {
        this.util().addDataStream("MyTable1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "uri")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "reqcount")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "start_time")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "bucket_id"))}), new CaseClassTypeInfo<Tuple4<String, Object, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$7 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<String, Object, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$4[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple4<String, Object, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple4<String, Object, Object, Object>>(this, fieldSerializers){

                    public Tuple4<String, Object, Object, Object> createInstance(Object[] fields) {
                        return new Tuple4((Object)((String)fields[0]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[3])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$4(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$7 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  CONCAT('http://txmov2.a.yximgs.com', uri) AS url,\n        |  reqcount AS download_count,\n        |  start_time AS `timestamp`\n        |FROM\n        |  (\n        |    SELECT\n        |      uri,\n        |      reqcount,\n        |      rownum_2,\n        |      start_time\n        |    FROM\n        |      (\n        |        SELECT\n        |          uri,\n        |          reqcount,\n        |          start_time,\n        |          ROW_NUMBER() OVER (\n        |            PARTITION BY start_time\n        |            ORDER BY\n        |              reqcount DESC\n        |          ) AS rownum_2\n        |        FROM\n        |          (\n        |            SELECT\n        |            uri,\n        |            reqcount,\n        |            start_time,\n        |            ROW_NUMBER() OVER (\n        |                PARTITION BY start_time, bucket_id\n        |                ORDER BY\n        |                reqcount DESC\n        |            ) AS rownum_1\n        |            FROM MyTable1\n        |          )\n        |        WHERE\n        |          rownum_1 <= 100000\n        |      )\n        |    WHERE\n        |      rownum_2 <= 100000\n        |  )\n        |")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testUpdatableRankWithDeduplicate() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v0 AS\n                               |SELECT *\n                               |FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY `c`\n                               |        ORDER BY `PROCTIME`()) AS `rowNum`\n                               |        FROM MyTable)\n                               |WHERE `rowNum` = 1\n                               |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v1 AS\n                               |SELECT c, b, SUM(a) FILTER (WHERE a > 0) AS d FROM v0 GROUP BY c, b\n                               |")).stripMargin());
        this.util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT c, b, d\n        |FROM (\n        |    SELECT\n        |       c, b, d,\n        |       ROW_NUMBER() OVER (PARTITION BY c, b ORDER BY d DESC) AS rn FROM v1\n        |) WHERE rn < 10\n        |")).stripMargin());
    }

    @Test
    public void testUpdatableRankAfterLookupJoin() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE LookupTable (\n                     |  `id` INT,\n                     |  `name` STRING,\n                     |  `age` INT\n                     |) WITH (\n                     |  'connector' = 'values'\n                     |)\n                     |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW V1 AS\n        |SELECT *\n        |FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id\n        |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT *\n         |FROM (\n         |  SELECT name, ids,\n         |      ROW_NUMBER() OVER (PARTITION BY name ORDER BY ids DESC) as rank_num\n         |  FROM (\n         |     SELECT name, SUM(id) FILTER (WHERE id > 0) as ids\n         |     FROM V1\n         |     GROUP BY name\n         |  ))\n         |WHERE rank_num <= 3\n         |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUpdatableRankAfterIntermediateScan() {
        this.util().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE VIEW v1 AS\n                               |SELECT a, MAX(b) AS b, MIN(c) AS c\n                               |FROM MyTable GROUP BY a\n                               |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE sink(\n                     |  `id` INT,\n                     |  `name` STRING,\n                     |  `age` BIGINT,\n                     |   primary key (id) not enforced\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'sink-insert-only' = 'false'\n                     |)\n                     |")).stripMargin());
        StatementSet stmtSet = this.util().tableEnv().createStatementSet();
        stmtSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                           |INSERT INTO sink\n                           |SELECT * FROM v1\n                           |")).stripMargin());
        stmtSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                           |INSERT INTO sink\n                           |SELECT a, b, c FROM (\n                           |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS rn\n                           |  FROM v1\n                           |) WHERE rn < 3\n                           |")).stripMargin());
        this.util().verifyExecPlan(stmtSet);
    }

    @Test
    public void testRankOutputUpsertKeyNotMatchSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE sink (\n                               | a INT,\n                               | b VARCHAR,\n                               | c BIGINT,\n                               | PRIMARY KEY (a) NOT ENFORCED\n                               |) WITH (\n                               | 'connector' = 'values'\n                               | ,'sink-insert-only' = 'false'\n                               |)\n                               |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, b, c FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRankOutputUpsertKeyInSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE sink (\n                               | a INT,\n                               | b VARCHAR,\n                               | c BIGINT,\n                               | PRIMARY KEY (a, b) NOT ENFORCED\n                               |) WITH (\n                               | 'connector' = 'values'\n                               | ,'sink-insert-only' = 'false'\n                               |)\n                               |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, b, c FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testRankOutputLostUpsertKeyWithSinkPk() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |CREATE TABLE sink (\n                               | a INT,\n                               | c BIGINT,\n                               | rn BIGINT,\n                               | PRIMARY KEY (a) NOT ENFORCED\n                               |) WITH (\n                               | 'connector' = 'values'\n                               | ,'sink-insert-only' = 'false'\n                               |)\n                               |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink\n        |SELECT a, c, rn FROM (\n        |  SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY c DESC) AS rn\n        |  FROM MyTable\n        |  )\n        |WHERE rn <= 100\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    public RankTest() {
        this.util().addDataStream("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$1 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.RankTest$$anon$1 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
    }
}

