/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.Duration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001m4Aa\u0003\u0007\u0001;!AA\u0005\u0001B\u0001B\u0003%Q\u0005C\u0003>\u0001\u0011\u0005a\bC\u0003C\u0001\u0011\u00053\tC\u0003V\u0001\u0011\u00051\tC\u0003[\u0001\u0011\u00051\tC\u0003]\u0001\u0011\u00051\tC\u0003_\u0001\u0011\u00051\tC\u0003a\u0001\u0011\u00051\tC\u0003c\u0001\u0011\u00051\tC\u0003e\u0001\u0011\u00051IA\rXS:$wn\u001e+bE2,g)\u001e8di&|g.\u0013+DCN,'BA\u0007\u000f\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u001fA\taa\u001d;sK\u0006l'BA\t\u0013\u0003\u001d\u0011XO\u001c;j[\u0016T!a\u0005\u000b\u0002\u000fAd\u0017M\u001c8fe*\u0011QCF\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003/a\tQA\u001a7j].T!!\u0007\u000e\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005Y\u0012aA8sO\u000e\u00011C\u0001\u0001\u001f!\ty\"%D\u0001!\u0015\t\t\u0003#A\u0003vi&d7/\u0003\u0002$A\tQ2\u000b\u001e:fC6LgnZ,ji\"\u001cF/\u0019;f)\u0016\u001cHOQ1tK\u0006!Qn\u001c3f!\t1#H\u0004\u0002(q9\u0011\u0001f\u000e\b\u0003SYr!AK\u001b\u000f\u0005-\"dB\u0001\u00174\u001d\ti#G\u0004\u0002/c5\tqF\u0003\u000219\u00051AH]8pizJ\u0011aG\u0005\u00033iI!a\u0006\r\n\u0005U1\u0012BA\n\u0015\u0013\t\t\"#\u0003\u0002\"!%\u0011\u0011\bI\u0001\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/Z\u0005\u0003wq\u0012\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u0005e\u0002\u0013A\u0002\u001fj]&$h\b\u0006\u0002@\u0003B\u0011\u0001\tA\u0007\u0002\u0019!)AE\u0001a\u0001K\u00051!-\u001a4pe\u0016$\u0012\u0001\u0012\t\u0003\u000b\"k\u0011A\u0012\u0006\u0002\u000f\u0006)1oY1mC&\u0011\u0011J\u0012\u0002\u0005+:LG\u000f\u000b\u0002\u0004\u0017B\u0011AjU\u0007\u0002\u001b*\u0011ajT\u0001\u0004CBL'B\u0001)R\u0003\u001dQW\u000f]5uKJT!A\u0015\u000e\u0002\u000b),h.\u001b;\n\u0005Qk%A\u0003\"fM>\u0014X-R1dQ\u0006\u0001B/Z:u)Vl'\r\\3XS:$wn\u001e\u0015\u0003\t]\u0003\"\u0001\u0014-\n\u0005ek%\u0001\u0004+fgR$V-\u001c9mCR,\u0017!\b;fgR$V/\u001c2mK^Kg\u000eZ8x)Z3u+\u001b;i\u001f\u001a47/\u001a;)\u0005\u00159\u0016!\n;fgR$V/\u001c2mK^Kg\u000eZ8x)Z3u+\u001b;i\u001d\u0016<\u0017\r^5wK>3gm]3uQ\t1q+A\u0007uKN$\bj\u001c9XS:$wn\u001e\u0015\u0003\u000f]\u000b!\u0003^3ti\u000e+X.\u001e7bi\u0016<\u0016N\u001c3po\"\u0012\u0001bV\u0001\u0012i\u0016\u001cHoU3tg&|gnV5oI><\bFA\u0005X\u0003\u0001\"Xm\u001d;TKN\u001c\u0018n\u001c8XS:$wn^,ji\"\u0004\u0016M\u001d;ji&|gNQ=)\u0005)9\u0006\u0006\u0002\u0001h[:\u0004\"\u0001[6\u000e\u0003%T!A['\u0002\u0013\u0015DH/\u001a8tS>t\u0017B\u00017j\u0005))\u0005\u0010^3oI^KG\u000f[\u0001\u0006m\u0006dW/\u001a\u0017\u0002_\u000e\n\u0001\u000f\u0005\u0002rs6\t!O\u0003\u0002ti\u0006i\u0001/\u0019:b[\u0016$XM]5{K\u0012T!!\u001e<\u0002\u0015\u0015DH/\u001a8tS>t7O\u0003\u0002So*\u0011\u0001PF\u0001\ni\u0016\u001cH/\u001e;jYNL!A\u001f:\u00035A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0012=uK:\u001c\u0018n\u001c8")
public class WindowTableFunctionITCase
extends StreamingWithStateTestBase {
    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixeddelay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)BoxesRunTime.boxToInteger((int)1));
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(0L));
        this.env().configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        FailingCollectionSource.reset();
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(697).append("\n                       |CREATE TABLE T1 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(dataId).append("',\n                       | 'failing-source' = 'true'\n                       |)\n                       |").toString())).stripMargin());
    }

    @TestTemplate
    public void testTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:06,2020-10-10T00:00:05.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testTumbleWindowTVFWithNegativeOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM TABLE(\n         |  TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-09T23:59:59,2020-10-10T00:00:04,2020-10-10T00:00:03.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-10T00:00:18.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:29,2020-10-10T00:00:34,2020-10-10T00:00:33.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(\n        |  HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-09T23:59:55,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:10,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:25,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(\n        |  CUMULATE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '15' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:30,2020-10-10T00:00:29.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSessionWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(SESSION(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:32,2020-10-10T00:00:39,2020-10-10T00:00:38.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:32,2020-10-10T00:00:39,2020-10-10T00:00:38.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSessionWindowWithPartitionBy() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TO_TIMESTAMP(`ts`),\n        |  `int`,\n        |  `double`,\n        |  `float`,\n        |  `bigdec`,\n        |  `string`,\n        |  `name`,\n        |  CAST(`rowtime` AS STRING),\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM TABLE(SESSION(TABLE T1 PARTITION BY `name`, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:04,5,5.0,null,5.55,Hi,a,2020-10-10 00:00:04.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:12,2020-10-10T00:00:11.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:12,2020-10-10T00:00:11.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:01,2020-10-10T00:00:13,2020-10-10T00:00:12.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:32,2020-10-10T00:00:37,2020-10-10T00:00:36.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowTableFunctionITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

