/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Collection;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.stream.sql.WindowJoinITCase$;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

@ExtendWith(value={ParameterizedTestExtension.class})
@ScalaSignature(bytes="\u0006\u0001\u0005]f\u0001\u0002\u0011\"\u0001IB\u0001\"\u000f\u0001\u0003\u0002\u0003\u0006IA\u000f\u0005\t%\u0002\u0011\t\u0011)A\u0005'\"A\u0011\f\u0001B\u0001B\u0003%1\u000bC\u0003[\u0001\u0011\u00051\fC\u0004b\u0001\t\u0007I\u0011\u00012\t\r-\u0004\u0001\u0015!\u0003d\u0011\u0015a\u0007\u0001\"\u0011n\u0011\u0015a\b\u0001\"\u0001n\u0011\u0019\t\u0019\u0001\u0001C\u0001[\"1\u0011q\u0001\u0001\u0005\u00025Da!a\u0003\u0001\t\u0003i\u0007BBA\b\u0001\u0011\u0005Q\u000e\u0003\u0004\u0002\u0014\u0001!\t!\u001c\u0005\u0007\u0003/\u0001A\u0011A7\t\r\u0005m\u0001\u0001\"\u0001n\u0011\u0019\ty\u0002\u0001C\u0001[\"1\u00111\u0005\u0001\u0005\u00025Da!a\n\u0001\t\u0003i\u0007BBA\u0016\u0001\u0011\u0005Q\u000e\u0003\u0004\u00020\u0001!\t!\u001c\u0005\u0007\u0003g\u0001A\u0011A7\t\r\u0005]\u0002\u0001\"\u0001n\u0011\u0019\tY\u0004\u0001C\u0001[\"1\u0011q\b\u0001\u0005\u00025Da!a\u0011\u0001\t\u0003i\u0007BBA$\u0001\u0011\u0005Q\u000e\u0003\u0004\u0002L\u0001!\t!\\\u0004\b\u0003s\n\u0003\u0012AA>\r\u0019\u0001\u0013\u0005#\u0001\u0002~!1!,\bC\u0001\u0003\u000bCq!a\"\u001e\t\u0003\tII\u0001\tXS:$wn\u001e&pS:LEkQ1tK*\u0011!eI\u0001\u0004gFd'B\u0001\u0013&\u0003\u0019\u0019HO]3b[*\u0011aeJ\u0001\beVtG/[7f\u0015\tA\u0013&A\u0004qY\u0006tg.\u001a:\u000b\u0005)Z\u0013!\u0002;bE2,'B\u0001\u0017.\u0003\u00151G.\u001b8l\u0015\tqs&\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002a\u0005\u0019qN]4\u0004\u0001M\u0011\u0001a\r\t\u0003i]j\u0011!\u000e\u0006\u0003m\u0015\nQ!\u001e;jYNL!\u0001O\u001b\u00035M#(/Z1nS:<w+\u001b;i'R\fG/\u001a+fgR\u0014\u0015m]3\u0002\t5|G-\u001a\t\u0003w=s!\u0001P'\u000f\u0005ubeB\u0001 L\u001d\ty$J\u0004\u0002A\u0013:\u0011\u0011\t\u0013\b\u0003\u0005\u001es!a\u0011$\u000e\u0003\u0011S!!R\u0019\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0001\u0014B\u0001\u00180\u0013\taS&\u0003\u0002+W%\u0011\u0001&K\u0005\u0003M\u001dJ!AN\u0013\n\u00059+\u0014AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0017B\u0001)R\u0005A\u0019F/\u0019;f\u0005\u0006\u001c7.\u001a8e\u001b>$WM\u0003\u0002Ok\u0005yQo]3US6,7\u000f^1na2#(\u0010\u0005\u0002U/6\tQKC\u0001W\u0003\u0015\u00198-\u00197b\u0013\tAVKA\u0004C_>dW-\u00198\u0002!\u0015t\u0017M\u00197f\u0003NLhnY*uCR,\u0017A\u0002\u001fj]&$h\b\u0006\u0003]=~\u0003\u0007CA/\u0001\u001b\u0005\t\u0003\"B\u001d\u0005\u0001\u0004Q\u0004\"\u0002*\u0005\u0001\u0004\u0019\u0006\"B-\u0005\u0001\u0004\u0019\u0016!D*I\u0003:;\u0005*Q%`5>sU)F\u0001d!\t!\u0017.D\u0001f\u0015\t1w-\u0001\u0003uS6,'\"\u00015\u0002\t)\fg/Y\u0005\u0003U\u0016\u0014aAW8oK&#\u0017AD*I\u0003:;\u0005*Q%`5>sU\tI\u0001\u0007E\u00164wN]3\u0015\u00039\u0004\"\u0001V8\n\u0005A,&\u0001B+oSRD#a\u0002:\u0011\u0005MTX\"\u0001;\u000b\u0005U4\u0018aA1qS*\u0011q\u000f_\u0001\bUV\u0004\u0018\u000e^3s\u0015\tIx&A\u0003kk:LG/\u0003\u0002|i\nQ!)\u001a4pe\u0016,\u0015m\u00195\u0002\u001bQ,7\u000f^%o]\u0016\u0014(j\\5oQ\tAa\u0010\u0005\u0002t\u007f&\u0019\u0011\u0011\u0001;\u0003\u0019Q+7\u000f\u001e+f[Bd\u0017\r^3\u0002%Q,7\u000f^%o]\u0016\u0014(j\\5o\u001f:<FK\u0012\u0015\u0003\u0013y\fA\u0004^3ti&sg.\u001a:K_&twJ\\,U\r^KG\u000f[(gMN,G\u000f\u000b\u0002\u000b}\u0006!C/Z:u\u0013:tWM\u001d&pS:|en\u0016+G/&$\bNT3hCRLg/Z(gMN,G\u000f\u000b\u0002\f}\u0006\u0011C/Z:u\u0013:tWM\u001d&pS:<\u0016\u000e\u001e5Jg:{G\u000fR5ti&t7\r\u001e$s_6D#\u0001\u0004@\u0002OQ,7\u000f^%o]\u0016\u0014(j\\5o/&$\b.S:O_R$\u0015n\u001d;j]\u000e$hI]8n\u001f:<FK\u0012\u0015\u0003\u001by\f!\u0003^3tiN+W.\u001b&pS:,\u00050[:ug\"\u0012aB`\u0001\u0016i\u0016\u001cHoU3nS*{\u0017N\\#ySN$8o\u0016+GQ\tya0\u0001\buKN$8+Z7j\u0015>Lg.\u0013()\u0005Aq\u0018A\u0005;fgR\u001cV-\\5K_&t\u0017JT0X)\u001aC#!\u0005@\u0002+Q,7\u000f^!oi&Tu.\u001b8O_R,\u00050[:ug\"\u0012!C`\u0001\u0019i\u0016\u001cH/\u00118uS*{\u0017N\u001c(pi\u0016C\u0018n\u001d;t/R3\u0005FA\n\u007f\u0003E!Xm\u001d;B]RL'j\\5o\u001d>$\u0018J\u0014\u0015\u0003)y\fQ\u0003^3ti\u0006sG/\u001b&pS:tu\u000e^%O?^#f\t\u000b\u0002\u0016}\u0006aA/Z:u\u0019\u00164GOS8j]\"\u0012aC`\u0001\"i\u0016\u001cH\u000fT3gi*{\u0017N\\,ji\"L5OT8u\t&\u001cH/\u001b8di\u001a\u0013x.\u001c\u0015\u0003/y\fQ\u0002^3tiJKw\r\u001b;K_&t\u0007F\u0001\r\u007f\u0003\t\"Xm\u001d;SS\u001eDGOS8j]^KG\u000f[%t\u001d>$H)[:uS:\u001cGO\u0012:p[\"\u0012\u0011D`\u0001\u000ei\u0016\u001cHoT;uKJTu.\u001b8)\u0005iq\u0018A\t;fgR|U\u000f^3s\u0015>LgnV5uQ&\u001bhj\u001c;ESN$\u0018N\\2u\rJ|W\u000e\u000b\u0002\u001c}\":\u0001!!\u0015\u0002^\u0005}\u0003\u0003BA*\u00033j!!!\u0016\u000b\u0007\u0005]C/A\u0005fqR,gn]5p]&!\u00111LA+\u0005))\u0005\u0010^3oI^KG\u000f[\u0001\u0006m\u0006dW/\u001a\u0017\u0003\u0003C\u001a#!a\u0019\u0011\t\u0005\u0015\u0014QO\u0007\u0003\u0003ORA!!\u001b\u0002l\u0005i\u0001/\u0019:b[\u0016$XM]5{K\u0012TA!!\u001c\u0002p\u0005QQ\r\u001f;f]NLwN\\:\u000b\u0007e\f\tHC\u0002\u0002t-\n\u0011\u0002^3tiV$\u0018\u000e\\:\n\t\u0005]\u0014q\r\u0002\u001b!\u0006\u0014\u0018-\\3uKJL'0\u001a3UKN$X\t\u001f;f]NLwN\\\u0001\u0011/&tGm\\<K_&t\u0017\nV\"bg\u0016\u0004\"!X\u000f\u0014\u0007u\ty\bE\u0002U\u0003\u0003K1!a!V\u0005\u0019\te.\u001f*fMR\u0011\u00111P\u0001\u000ba\u0006\u0014\u0018-\\3uKJ\u001cHCAAF!\u0019\ti)a%\u0002\u00186\u0011\u0011q\u0012\u0006\u0004\u0003#;\u0017\u0001B;uS2LA!!&\u0002\u0010\nQ1i\u001c7mK\u000e$\u0018n\u001c8\u0011\u000bQ\u000bI*!(\n\u0007\u0005mUKA\u0003BeJ\f\u0017\u0010\u0005\u0003\u0002 \u0006\u0015VBAAQ\u0015\r\t\u0019kZ\u0001\u0005Y\u0006tw-\u0003\u0003\u0002(\u0006\u0005&AB(cU\u0016\u001cG\u000fK\u0004 \u0003W\u000b\t,a-\u0011\t\u0005\u0015\u0014QV\u0005\u0005\u0003_\u000b9G\u0001\u0006QCJ\fW.\u001a;feN\fAA\\1nK\u0006\u0012\u0011QW\u0001@'R\fG/\u001a\"bG.,g\u000eZ\u001f|aud\u0003%V:f)&lWm\u001d;b[BdEO\u001f\u0011>Am\fT\u0010\f\u0011F]\u0006\u0014G.Z!ts:\u001c7\u000b^1uK\u0002j\u0004e\u001f\u001a~\u0001")
public class WindowJoinITCase
extends StreamingWithStateTestBase {
    private final boolean useTimestampLtz;
    private final boolean enableAsyncState;
    private final ZoneId SHANGHAI_ZONE;

    @Parameters(name="StateBackend={0}, UseTimestampLtz = {1}, EnableAsyncState = {2}")
    public static Collection<Object[]> parameters() {
        return WindowJoinITCase$.MODULE$.parameters();
    }

    public ZoneId SHANGHAI_ZONE() {
        return this.SHANGHAI_ZONE;
    }

    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixeddelay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)BoxesRunTime.boxToInteger((int)1));
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(0L));
        this.env().configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        FailingCollectionSource.reset();
        this.tEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_STATE_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)this.enableAsyncState));
        String dataId1 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        String dataIdWithLtz = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithLtzInShanghai());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(460).append("\n         |CREATE TABLE T1 (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object)(this.useTimestampLtz ? dataIdWithLtz : dataId1)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
        String dataId2 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowData2WithTimestamp());
        String dataIdWithLtz2 = TestValuesTableFactory.registerData(TestData$.MODULE$.windowData2WithLtzInShanghai());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(460).append("\n         |CREATE TABLE T2 (\n         | `ts` ").append((Object)(this.useTimestampLtz ? "BIGINT" : "STRING")).append(",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)")).append(",\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '").append((Object)(this.useTimestampLtz ? dataIdWithLtz2 : dataId2)).append("',\n         | 'failing-source' = 'true'\n         |)\n         |").toString())).stripMargin());
        this.tEnv().getConfig().setLocalTimeZone(this.SHANGHAI_ZONE());
    }

    @TestTemplate
    public void testInnerJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", (List)Nil$.MODULE$)));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinOnWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(768).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z,4,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1,Comment#3,b", (List)Nil$.MODULE$)))))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999,4,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1,Comment#3,b", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinOnWTFWithOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(809).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime),INTERVAL '5' SECOND, INTERVAL '1' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-09T16:00:10.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-09T16:00:20.999Z,4,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-09T16:00:35.999Z,1,Comment#3,b", (List)Nil$.MODULE$)))))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:06,2020-10-10T00:00:11,2020-10-10T00:00:10.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:16,2020-10-10T00:00:21,2020-10-10T00:00:20.999,4,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:31,2020-10-10T00:00:36,2020-10-10T00:00:35.999,1,Comment#3,b", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinOnWTFWithNegativeOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(812).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '-1' SECOND))\n         |)R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` = R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-09T16:00:08.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-09T16:00:18.999Z,4,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-09T16:00:38.999Z,1,Comment#3,b", (List)Nil$.MODULE$)))))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:04,2020-10-10T00:00:09,2020-10-10T00:00:08.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:14,2020-10-10T00:00:19,2020-10-10T00:00:18.999,4,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:34,2020-10-10T00:00:39,2020-10-10T00:00:38.999,1,Comment#3,b", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testInnerJoinWithIsNotDistinctFromOnWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(788).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time,\n         |  R.`int`,\n         |  R.`string`,\n         |  R.`name`\n         |FROM (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L\n         |JOIN (\n         |SELECT *\n         |FROM TABLE(\n         |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) R\n         |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n         |L.`name` IS NOT DISTINCT from R.`name`\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,3,Hello,b", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z,6,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z,4,Hi,b", (List)new .colon.colon((Object)"2020-10-09T16:00:32Z,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,7,null,null", (List)new .colon.colon((Object)"2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z,1,Comment#3,b", (List)Nil$.MODULE$))))))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,3,Hello,b", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999,6,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999,4,Hi,b", (List)new .colon.colon((Object)"2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,7,null,null", (List)new .colon.colon((Object)"2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999,1,Comment#3,b", (List)Nil$.MODULE$)))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinExists() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE EXISTS (\n        |SELECT * FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n        |        AND L.`name` = R.`name`)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1", (List)Nil$.MODULE$)));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinExistsWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(714).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE EXISTS (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`)\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z", (List)Nil$.MODULE$)))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", (List)new .colon.colon((Object)"2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", (List)new .colon.colon((Object)"2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE L.`name` IN (\n        |SELECT `name` FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1", (List)Nil$.MODULE$)));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testSemiJoinIN_WTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(724).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE L.`name` IN (\n         |  SELECT `name`\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`)\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:06Z,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:07Z,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:16Z,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-09T16:00:19.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:34Z,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z", (List)Nil$.MODULE$)))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", (List)new .colon.colon((Object)"2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", (List)new .colon.colon((Object)"2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", (List)new .colon.colon((Object)"2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", (List)Nil$.MODULE$))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotExists() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE NOT EXISTS (\n        |SELECT * FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n        |        AND L.`name` = R.`name`)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,2", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,0", (List)Nil$.MODULE$)));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotExistsWTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(748).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE NOT EXISTS (\n         |  SELECT *\n         |    FROM TABLE(\n         |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end\n         |        AND L.`name` = R.`name`) AND L.`float` IS NOT NULL\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:01Z,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:02Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:03Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:04Z,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:08Z,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:32Z,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-09T16:00:34.999Z", (List)Nil$.MODULE$)))))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", (List)new .colon.colon((Object)"2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv1\n        |    FROM TABLE(\n        |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |) L WHERE L.`name` NOT IN (\n        |SELECT `name` FROM(\n        |  SELECT\n        |    `name`,\n        |    window_start,\n        |    window_end,\n        |    COUNT(DISTINCT `string`) as uv2\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |) R\n        |  WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,2", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1", (List)Nil$.MODULE$));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testAntiJoinNotIN_WTF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(714).append("\n         |SELECT\n         |  ").append((Object)(this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(L.`ts`, 3)" : "TO_TIMESTAMP(L.`ts`)")).append(",\n         |  L.`int`,\n         |  L.`double`,\n         |  L.`float`,\n         |  L.`bigdec`,\n         |  L.`string`,\n         |  L.`name`,\n         |  CAST(L.`rowtime` AS STRING),\n         |  L.window_start,\n         |  L.window_end,\n         |  L.window_time\n         |  FROM (\n         |  SELECT *\n         |    FROM TABLE(\n         |    TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |) L WHERE L.`name` NOT IN (\n         |SELECT `name`\n         |FROM\n         |TABLE(TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) R\n         |WHERE L.window_start = R.window_start AND L.window_end = R.window_end) AND\n         | L.`float` IS NOT NULL\n         |").toString())).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)new .colon.colon((Object)"2020-10-09T16:00:01Z,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:02Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:03Z,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:04Z,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-09T16:00:04.999Z", (List)new .colon.colon((Object)"2020-10-09T16:00:08Z,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-09T16:00:09.999Z", (List)Nil$.MODULE$))))) : (Seq)new .colon.colon((Object)"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:02,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:02.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:03,2,2.0,2.0,2.22,Comment#1,a,2020-10-10 00:00:03.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", (List)new .colon.colon((Object)"2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", (List)Nil$.MODULE$)))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testLeftJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |LEFT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,null", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testLeftJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |LEFT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"a,2020-10-10T00:00,2020-10-10T00:00:05,2,null", (List)new .colon.colon((Object)"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,null", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testRightJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |RIGHT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", (List)new .colon.colon((Object)"null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testRightJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, R.window_start, R.window_end, uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |RIGHT JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)new .colon.colon((Object)"null,2020-10-10T00:00,2020-10-10T00:00:05,null,2", (List)new .colon.colon((Object)"null,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", (List)new .colon.colon((Object)"b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", (List)new .colon.colon((Object)"b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", (List)new .colon.colon((Object)"null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0", (List)Nil$.MODULE$))))));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testOuterJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end,\n        |uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |FULL OUTER JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.`name` = R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,null,null,0,null", "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1", "null,null,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,0"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @TestTemplate
    public void testOuterJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.`name`, L.window_start, L.window_end, R.`name`, R.window_start, R.window_end,\n        |uv1, uv2\n        |FROM (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv1\n        |FROM TABLE(\n        |   TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) L\n        |FULL OUTER JOIN (\n        |SELECT\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  COUNT(DISTINCT `string`) as uv2\n        |FROM TABLE(\n        |   TUMBLE(TABLE T2, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |GROUP BY `name`, window_start, window_end\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |   L.`name` IS NOT DISTINCT from R.`name`\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toDataStream().addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,null,null,null,2,null", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,null,null,null,1,null", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,null,2020-10-10T00:00:30,2020-10-10T00:00:35,0,0", "null,null,null,a1,2020-10-10T00:00,2020-10-10T00:00:05,null,2", "null,null,null,a1,2020-10-10T00:00:05,2020-10-10T00:00:10,null,1"}));
        Assertions.assertThat((String)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n")).isEqualTo(((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowJoinITCase(StreamingWithStateTestBase.StateBackendMode mode, boolean useTimestampLtz, boolean enableAsyncState) {
        this.useTimestampLtz = useTimestampLtz;
        this.enableAsyncState = enableAsyncState;
        super(mode);
        this.SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
    }
}

