/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.Duration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\u0005eb\u0001\u0002\f\u0018\u0001!B\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\r\u0005\u0006\u0011\u0002!\t!\u0013\u0005\u0006\u001b\u0002!\tE\u0014\u0005\u0006A\u0002!\tA\u0014\u0005\u0006K\u0002!\tA\u0014\u0005\u0006O\u0002!\tA\u0014\u0005\u0006S\u0002!\tA\u0014\u0005\u0006W\u0002!\tA\u0014\u0005\u0006[\u0002!\tA\u0014\u0005\u0006_\u0002!\tA\u0014\u0005\u0006c\u0002!\tA\u0014\u0005\u0006g\u0002!\tA\u0014\u0005\u0006k\u0002!\tA\u0014\u0005\u0006o\u0002!\tA\u0014\u0005\u0006s\u0002!\tA\u0014\u0005\u0006w\u0002!\tA\u0014\u0005\u0006{\u0002!\tA\u0014\u0005\u0006\u007f\u0002!\tA\u0014\u0005\u0007\u0003\u0007\u0001A\u0011\u0001(\t\r\u0005\u001d\u0001\u0001\"\u0001O\u0011\u0019\tY\u0001\u0001C\u0001\u001d\n\u0001r+\u001b8e_^\u0014\u0016M\\6J)\u000e\u000b7/\u001a\u0006\u00031e\t1a]9m\u0015\tQ2$\u0001\u0004tiJ,\u0017-\u001c\u0006\u00039u\tqA];oi&lWM\u0003\u0002\u001f?\u00059\u0001\u000f\\1o]\u0016\u0014(B\u0001\u0011\"\u0003\u0015!\u0018M\u00197f\u0015\t\u00113%A\u0003gY&t7N\u0003\u0002%K\u00051\u0011\r]1dQ\u0016T\u0011AJ\u0001\u0004_J<7\u0001A\n\u0003\u0001%\u0002\"AK\u0017\u000e\u0003-R!\u0001L\u000e\u0002\u000bU$\u0018\u000e\\:\n\u00059Z#AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0017\u0001B7pI\u0016\u0004\"!M#\u000f\u0005I\u001aeBA\u001aC\u001d\t!\u0014I\u0004\u00026\u0001:\u0011ag\u0010\b\u0003oyr!\u0001O\u001f\u000f\u0005ebT\"\u0001\u001e\u000b\u0005m:\u0013A\u0002\u001fs_>$h(C\u0001'\u0013\t!S%\u0003\u0002#G%\u0011\u0001%I\u0005\u0003=}I!\u0001H\u000f\n\u00051Z\u0012B\u0001#,\u0003i\u0019FO]3b[&twmV5uQN#\u0018\r^3UKN$()Y:f\u0013\t1uI\u0001\tTi\u0006$XMQ1dW\u0016tG-T8eK*\u0011AiK\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005)c\u0005CA&\u0001\u001b\u00059\u0002\"B\u0018\u0003\u0001\u0004\u0001\u0014A\u00022fM>\u0014X\rF\u0001P!\t\u00016+D\u0001R\u0015\u0005\u0011\u0016!B:dC2\f\u0017B\u0001+R\u0005\u0011)f.\u001b;)\u0005\r1\u0006CA,_\u001b\u0005A&BA-[\u0003\r\t\u0007/\u001b\u0006\u00037r\u000bqA[;qSR,'O\u0003\u0002^K\u0005)!.\u001e8ji&\u0011q\f\u0017\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0005;fgR$V/\u001c2mK^Kg\u000eZ8xQ\t!!\r\u0005\u0002XG&\u0011A\r\u0017\u0002\r)\u0016\u001cH\u000fV3na2\fG/Z\u0001\u001fi\u0016\u001cH\u000fV;nE2,w+\u001b8e_^<\u0016\u000e\u001e5SC:\\wJ\u001a4tKRD#!\u00022\u0002CQ,7\u000f\u001e+v[\ndWmV5oI><x+\u001b;i_V$(+\u00198l\u001dVl'-\u001a:)\u0005\u0019\u0011\u0017a\u0005;fgR$V/\u001c2mK^Kg\u000eZ8x)Z3\u0005FA\u0004c\u0003u!Xm\u001d;Uk6\u0014G.Z,j]\u0012|w\u000f\u0016,G/&$\bn\u00144gg\u0016$\bF\u0001\u0005c\u0003\u0015\"Xm\u001d;Uk6\u0014G.Z,j]\u0012|w\u000f\u0016,G/&$\bNT3hCRLg/Z(gMN,G\u000f\u000b\u0002\nE\u0006YB/Z:u)Vl'\r\\3XS:$wn\u001e+W\r^KG\u000f[\"bY\u000eD#A\u00032\u0002\u001bQ,7\u000f\u001e%pa^Kg\u000eZ8xQ\tY!-A\u000euKN$\bj\u001c9XS:$wn^,ji\"\u0014\u0016M\\6PM\u001a\u001cX\r\u001e\u0015\u0003\u0019\t\fa\u0004^3ti\"{\u0007oV5oI><x+\u001b;i_V$(+\u00198l\u001dVl'-\u001a:)\u00055\u0011\u0017\u0001\u0005;fgRDu\u000e],j]\u0012|w\u000f\u0016,GQ\tq!-\u0001\ruKN$\bj\u001c9XS:$wn\u001e+W\r^KG\u000f[\"bY\u000eD#a\u00042\u0002%Q,7\u000f^\"v[Vd\u0017\r^3XS:$wn\u001e\u0015\u0003!\t\f\u0001\u0005^3ti\u000e+X.\u001e7bi\u0016<\u0016N\u001c3po^KG\u000f\u001b*b].|eMZ:fi\"\u0012\u0011CY\u0001$i\u0016\u001cHoQ;nk2\fG/Z,j]\u0012|woV5uQ>,HOU1oW:+XNY3sQ\t\u0011\"-\u0001\u0005uKN$Hk\u001c92Q\t\u0019\"-A\u000buKN$8)^7vY\u0006$XmV5oI><HK\u0016$)\u0005Q\u0011\u0017!\b;fgR\u001cU/\\;mCR,w+\u001b8e_^$fKR,ji\"\u001c\u0015\r\\2)\u0005U\u0011\u0007f\u0002\u0001\u0002\u0012\u0005u\u0011q\u0004\t\u0005\u0003'\tI\"\u0004\u0002\u0002\u0016)\u0019\u0011q\u0003-\u0002\u0013\u0015DH/\u001a8tS>t\u0017\u0002BA\u000e\u0003+\u0011!\"\u0012=uK:$w+\u001b;i\u0003\u00151\u0018\r\\;fY\t\t\tc\t\u0002\u0002$A!\u0011QEA\u001b\u001b\t\t9C\u0003\u0003\u0002*\u0005-\u0012!\u00049be\u0006lW\r^3sSj,GM\u0003\u0003\u0002.\u0005=\u0012AC3yi\u0016t7/[8og*\u0019Q,!\r\u000b\u0007\u0005M\u0012%A\u0005uKN$X\u000f^5mg&!\u0011qGA\u0014\u0005i\u0001\u0016M]1nKR,'/\u001b>fIR+7\u000f^#yi\u0016t7/[8o\u0001")
public class WindowRankITCase
extends StreamingWithStateTestBase {
    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixeddelay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)BoxesRunTime.boxToInteger((int)1));
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(0L));
        this.env().configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        FailingCollectionSource.reset();
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(697).append("\n                       |CREATE TABLE T1 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(dataId).append("',\n                       | 'failing-source' = 'true'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().createFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
    }

    @TestTemplate
    public void testTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", (List)Nil$.MODULE$));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", (List)Nil$.MODULE$));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithNegativeOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-10T00:00:18.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:29,2020-10-10T00:00:34,2020-10-10T00:00:33.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `int`,\n        |  `string`,\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n        |  FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,null,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2,Comment#1,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "3,Comment#2,a,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "3,Hello,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-09T23:59:55,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Comment#2|Hi|Comment#1,1", "a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:05,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,1", "b,2020-10-10T00:00:10,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:25,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2", (List)new .colon.colon((Object)"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowTVF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:10,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindowTVFWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `int`,\n        |  `string`,\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n        |  FROM TABLE(\n        |      HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,null,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2,Comment#1,a,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "5,null,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "5,Hi,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "3,Hello,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "3,Comment#2,a,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "3,Hello,b,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "4,Hi,b,2020-10-10T00:00:10,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "7,null,null,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "1,Comment#3,b,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2,1", "a,2020-10-10T00:00,2020-10-10T00:00:15,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2,1", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:30,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0,null,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3,2", (List)Nil$.MODULE$)))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3", (List)Nil$.MODULE$)))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTop1() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 1\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,1", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowTVF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n         |  FROM TABLE(\n         |      CUMULATE(\n         |        TABLE T1,\n         |        DESCRIPTOR(rowtime),\n         |        INTERVAL '5' SECOND,\n         |        INTERVAL '15' SECOND))\n         |)\n         |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:30,2020-10-10T00:00:29.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindowTVFWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `int`,\n        |  `string`,\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end, `name` ORDER BY `int` DESC) as rownum\n        |  FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,null,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2,Comment#1,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "5,null,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "5,Hi,a,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "5,null,a,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "5,Hi,a,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "6,Hi,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "6,Hi,b,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "3,Hello,b,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "3,Hello,b,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:30,2020-10-10T00:00:29.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowRankITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

