/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql.join;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.logical.LogicalJoin;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.api.typeutils.CaseClassTypeInfo;
import org.apache.flink.table.api.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
import org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest$;
import org.apache.flink.table.planner.plan.utils.IntervalJoinUtil$;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil$;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005uc\u0001\u0002\u0012$\u0001YBQ!\u0010\u0001\u0005\u0002yBq!\u0011\u0001C\u0002\u0013%!\t\u0003\u0004G\u0001\u0001\u0006Ia\u0011\u0005\u0006\u000f\u0002!\t\u0001\u0013\u0005\u00065\u0002!\t\u0001\u0013\u0005\u00069\u0002!\t\u0001\u0013\u0005\u0006=\u0002!\t\u0001\u0013\u0005\u0006A\u0002!\t\u0001\u0013\u0005\u0006E\u0002!\t\u0001\u0013\u0005\u0006I\u0002!\t\u0001\u0013\u0005\u0006M\u0002!\t\u0001\u0013\u0005\u0006Q\u0002!\t\u0001\u0013\u0005\u0006U\u0002!\t\u0001\u0013\u0005\u0006Y\u0002!\t\u0001\u0013\u0005\u0006]\u0002!\t\u0001\u0013\u0005\u0006a\u0002!\t\u0001\u0013\u0005\u0006e\u0002!\t\u0001\u0013\u0005\u0006i\u0002!\t\u0001\u0013\u0005\u0006m\u0002!\t\u0001\u0013\u0005\u0006q\u0002!\t\u0001\u0013\u0005\u0006u\u0002!\t\u0001\u0013\u0005\u0006y\u0002!\t\u0001\u0013\u0005\u0006}\u0002!\t\u0001\u0013\u0005\u0007\u0003\u0003\u0001A\u0011\u0001%\t\r\u0005\u0015\u0001\u0001\"\u0001I\u0011\u0019\tI\u0001\u0001C\u0001\u0011\"1\u0011Q\u0002\u0001\u0005\u0002!Ca!!\u0005\u0001\t\u0003A\u0005BBA\u000b\u0001\u0011\u0005\u0001\n\u0003\u0004\u0002\u001a\u0001!\t\u0001\u0013\u0005\u0007\u0003;\u0001A\u0011\u0001%\t\u000f\u0005\u0005\u0002\u0001\"\u0003\u0002$!9\u0011\u0011\u000b\u0001\u0005\n\u0005M#\u0001E%oi\u0016\u0014h/\u00197K_&tG+Z:u\u0015\t!S%\u0001\u0003k_&t'B\u0001\u0014(\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003Q%\naa\u001d;sK\u0006l'B\u0001\u0016,\u0003\u0011\u0001H.\u00198\u000b\u00051j\u0013a\u00029mC:tWM\u001d\u0006\u0003]=\nQ\u0001^1cY\u0016T!\u0001M\u0019\u0002\u000b\u0019d\u0017N\\6\u000b\u0005I\u001a\u0014AB1qC\u000eDWMC\u00015\u0003\ry'oZ\u0002\u0001'\t\u0001q\u0007\u0005\u00029w5\t\u0011H\u0003\u0002;W\u0005)Q\u000f^5mg&\u0011A(\u000f\u0002\u000e)\u0006\u0014G.\u001a+fgR\u0014\u0015m]3\u0002\rqJg.\u001b;?)\u0005y\u0004C\u0001!\u0001\u001b\u0005\u0019\u0013\u0001B;uS2,\u0012a\u0011\t\u0003q\u0011K!!R\u001d\u0003'M#(/Z1n)\u0006\u0014G.\u001a+fgR,F/\u001b7\u0002\u000bU$\u0018\u000e\u001c\u0011\u0002GQ,7\u000f^%oi\u0016\u0014\u0018M\u001e7K_&t7+\u001b8hY\u0016$\u0016.\\3D_:$\u0017\u000e^5p]R\t\u0011\n\u0005\u0002K\u001b6\t1JC\u0001M\u0003\u0015\u00198-\u00197b\u0013\tq5J\u0001\u0003V]&$\bF\u0001\u0003Q!\t\t\u0006,D\u0001S\u0015\t\u0019F+A\u0002ba&T!!\u0016,\u0002\u000f),\b/\u001b;fe*\u0011qkM\u0001\u0006UVt\u0017\u000e^\u0005\u00033J\u0013A\u0001V3ti\u0006qB/Z:u\u0013:$XM]1wC2$\u0015N\u001a4US6,\u0017J\u001c3jG\u0006$xN\u001d\u0015\u0003\u000bA\u000b\u0011\u0005^3ti&sG/\u001a:wC2Tu.\u001b8P]\u0012KgM\u001a*poRKW.\u001a+za\u0016D#A\u0002)\u00029Q,7\u000f^%oi\u0016\u0014\u0018M^1m\u001d>$8I\u001c4D_:$\u0017\u000e^5p]\"\u0012q\u0001U\u0001\u001fi\u0016\u001cHOT8S_^$\u0018.\\3BiR\u0014\u0018NY;uK&s'+Z:vYRD#\u0001\u0003)\u0002aQ,7\u000f^,j]\u0012|woT;uKJTu.\u001b8XSRD\u0007+\u001f;i_:4UO\\2uS>t\u0017J\\\"p]\u0012LG/[8oQ\tI\u0001+A\u0014uKN$\bK]8dKN\u001c\u0018N\\4US6,\u0017J\u001c8fe*{\u0017N\\,ji\"|en\u00117bkN,\u0007F\u0001\u0006Q\u0003)\"Xm\u001d;Qe>\u001cWm]:j]\u001e$\u0016.\\3J]:,'OS8j]^KG\u000f[,iKJ,7\t\\1vg\u0016D#a\u0003)\u0002aQ,7\u000f\u001e)s_\u000e,7o]5oORKW.Z%o]\u0016\u0014(j\\5o/&$\bn\\;u\u000bF,\u0018\r\\\"p]\u0012LG/[8oQ\ta\u0001+\u0001\u0011uKN$(k\\<US6,\u0017J\u001c8fe*{\u0017N\\,ji\"|en\u00117bkN,\u0007FA\u0007Q\u0003\u0015\"Xm\u001d;J]R,'O^1m\u0015>Lgn\u00148US6,7\u000f^1na2#(PU8xi&lW\r\u000b\u0002\u000f!\u0006IC/Z:u%><H+[7f\u0013:tWM\u001d&pS:<\u0016\u000e\u001e5pkR,\u0015/^1m\u0007>tG-\u001b;j_:D#a\u0004)\u0002GQ,7\u000f\u001e*poRKW.Z%o]\u0016\u0014(j\\5o/&$\bn\u00165fe\u0016\u001cE.Y;tK\"\u0012\u0001\u0003U\u0001\u0019i\u0016\u001cHOS8j]^KG\u000f[#rk&\u0004&o\\2US6,\u0007FA\tQ\u0003]!Xm\u001d;K_&tw+\u001b;i\u000bF,\u0018NU8x)&lW\r\u000b\u0002\u0013!\u00069B/Z:u\u0015>LgnV5uQ:+H\u000e\u001c'ji\u0016\u0014\u0018\r\u001c\u0015\u0003'A\u000bq\u0006^3tiJ{w\u000fV5nK&sg.\u001a:K_&t\u0017I\u001c3XS:$wn^!hOJ,w-\u0019;j_:|eNR5sgRD#\u0001\u0006)\u0002aQ,7\u000f\u001e*poRKW.Z%o]\u0016\u0014(j\\5o\u0003:$w+\u001b8e_^\fum\u001a:fO\u0006$\u0018n\u001c8P]N+7m\u001c8eQ\t)\u0002+A\ruKN$\bK]8d)&lW\rT3gi>+H/\u001a:K_&t\u0007F\u0001\fQ\u0003a!Xm\u001d;S_^$\u0016.\\3MK\u001a$x*\u001e;fe*{\u0017N\u001c\u0015\u0003/A\u000b!\u0004^3tiB\u0013xn\u0019+j[\u0016\u0014\u0016n\u001a5u\u001fV$XM\u001d&pS:D#\u0001\u0007)\u00023Q,7\u000f\u001e*poRKW.\u001a*jO\"$x*\u001e;fe*{\u0017N\u001c\u0015\u00033A\u000b\u0011\u0004^3tiB\u0013xn\u0019+j[\u00164U\u000f\u001c7PkR,'OS8j]\"\u0012!\u0004U\u0001\u0019i\u0016\u001cHOU8x)&lWMR;mY>+H/\u001a:K_&t\u0007FA\u000eQ\u0003A!Xm\u001d;PkR,'OS8j]>\u0003H\u000f\u000b\u0002\u001d!\u0006!B/Z:u\u0015>Lg\u000eV5nK\n{WO\u001c3befD#!\b)\u0002=Q,7\u000f\u001e&pS:\u0014V-\\1j]\u000e{g\u000eZ5uS>t7i\u001c8wKJ$\bF\u0001\u0010Q\u0003e!Xm\u001d;GC2d'-Y2l)>\u0014VmZ;mCJTu.\u001b8)\u0005}\u0001\u0016A\u0005<fe&4\u0017\u0010V5nK\n{WO\u001c3bef$\u0012\"SA\u0013\u0003\u007f\tI%!\u0014\t\u000f\u0005\u001d\u0002\u00051\u0001\u0002*\u0005\u0001B/[7f\u0007>tG-\u001b;j_:\u001c\u0016\u000f\u001c\t\u0005\u0003W\tID\u0004\u0003\u0002.\u0005U\u0002cAA\u0018\u00176\u0011\u0011\u0011\u0007\u0006\u0004\u0003g)\u0014A\u0002\u001fs_>$h(C\u0002\u00028-\u000ba\u0001\u0015:fI\u00164\u0017\u0002BA\u001e\u0003{\u0011aa\u0015;sS:<'bAA\u001c\u0017\"9\u0011\u0011\t\u0011A\u0002\u0005\r\u0013aC3ya2+g\r^*ju\u0016\u00042ASA#\u0013\r\t9e\u0013\u0002\u0005\u0019>tw\rC\u0004\u0002L\u0001\u0002\r!a\u0011\u0002\u0019\u0015D\bOU5hQR\u001c\u0016N_3\t\u000f\u0005=\u0003\u00051\u0001\u0002*\u0005YQ\r\u001f9US6,G+\u001f9f\u0003q1XM]5gsJ+W.Y5o\u0007>tG-\u001b;j_:\u001cuN\u001c<feR$R!SA+\u00033Bq!a\u0016\"\u0001\u0004\tI#\u0001\u0005tc2\fV/\u001a:z\u0011\u001d\tY&\ta\u0001\u0003S\t!#\u001a=qK\u000e$8i\u001c8eSRLwN\\*ue\u0002")
public class IntervalJoinTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testInteravlJoinSingleTimeCondition() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a FROM MyTable t1 JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND t1.proctime > t2.proctime - INTERVAL '5' SECOND\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testInteravalDiffTimeIndicator() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a FROM MyTable t1 JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.proctime > t2.proctime - INTERVAL '5' SECOND AND\n        |  t1.proctime < t2.rowtime + INTERVAL '5' SECOND\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testIntervalJoinOnDiffRowTimeType() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a FROM MyTable2 t1 JOIN MyTable3 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime > t2.rowtime - INTERVAL '5' SECOND AND\n        |  t1.rowtime < t2.rowtime + INTERVAL '5' SECOND\n      ")).stripMargin();
        Assertions.assertThatThrownBy(() -> this.util().verifyExecPlan(sql)).hasMessageContaining("Interval join with rowtime attribute requires same rowtime types, but the types are TIMESTAMP(3) *ROWTIME* and TIMESTAMP_LTZ(3) *ROWTIME*");
    }

    @Test
    public void testInteravalNotCnfCondition() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a FROM MyTable t1 JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  (t1.proctime > t2.proctime - INTERVAL '5' SECOND OR\n        |   t1.proctime < t2.rowtime + INTERVAL '5' SECOND)\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testNoRowtimeAttributeInResult() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable t1, MyTable2 t2 WHERE\n        |  t1.a = t2.a AND\n        |  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND t2.proctime\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(TableException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testWindowOuterJoinWithPythonFunctionInCondition() {
        this.util().addTemporarySystemFunction("pyFunc", (UserDefinedFunction)new JavaUserDefinedScalarFunctions.PythonScalarFunction("pyFunc"));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON\n        |    t1.a = t2.a AND pyFunc(t1.a, t2.a) = t1.a + t2.a AND\n        |    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR\n      ")).stripMargin();
        Assertions.assertThatExceptionOfType(TableException.class).isThrownBy(() -> this.util().verifyExecPlan(sql));
    }

    @Test
    public void testProcessingTimeInnerJoinWithOnClause() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON\n        |    t1.a = t2.a AND\n        |    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcessingTimeInnerJoinWithWhereClause() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE\n        |    t1.a = t2.a AND\n        |    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcessingTimeInnerJoinWithoutEqualCondition() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON\n        |    t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowTimeInnerJoinWithOnClause() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testIntervalJoinOnTimestampLtzRowtime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.a FROM MyTable3 t1 JOIN MyTable4 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime > t2.rowtime - INTERVAL '5' SECOND AND\n        |  t1.rowtime < t2.rowtime + INTERVAL '5' SECOND\n      ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testRowTimeInnerJoinWithoutEqualCondition() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1 JOIN MyTable2 t2 ON\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowTimeInnerJoinWithWhereClause() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE\n        |  t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' MINUTE AND t2.rowtime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testJoinWithEquiProcTime() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE\n        |  t1.a = t2.a AND t1.proctime = t2.proctime\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testJoinWithEquiRowTime() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE\n        |  t1.a = t2.a AND t1.rowtime = t2.rowtime\n        ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testJoinWithNullLiteral() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |WITH T1 AS (SELECT a, b, c, proctime, CAST(null AS BIGINT) AS nullField FROM MyTable),\n        |     T2 AS (SELECT a, b, c, proctime, CAST(12 AS BIGINT) AS nullField FROM MyTable2)\n        |\n        |SELECT t2.a, t2.c, t1.c\n        |FROM T1 AS t1\n        |JOIN T2 AS t2 ON t1.a = t2.a AND t1.nullField = t2.nullField AND\n        |  t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND\n        |  t2.proctime + INTERVAL '5' SECOND\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowTimeInnerJoinAndWindowAggregationOnFirst() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.b, SUM(t2.a) AS aSum, COUNT(t2.b) AS bCnt\n        |FROM MyTable t1, MyTable2 t2\n        |WHERE t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' MINUTE AND t2.rowtime + INTERVAL '1' HOUR\n        |GROUP BY TUMBLE(t1.rowtime, INTERVAL '6' HOUR), t1.b\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowTimeInnerJoinAndWindowAggregationOnSecond() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t2.b, SUM(t1.a) AS aSum, COUNT(t1.b) AS bCnt\n        |FROM MyTable t1, MyTable2 t2\n        |WHERE t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' MINUTE AND t2.rowtime + INTERVAL '1' HOUR\n        |GROUP BY TUMBLE(t2.rowtime, INTERVAL '6' HOUR), t2.b\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeLeftOuterJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b\n        |FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowTimeLeftOuterJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b\n        |FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeRightOuterJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b\n        |FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowTimeRightOuterJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b\n        |FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeFullOuterJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b\n        |FROM MyTable t1 Full OUTER JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testRowTimeFullOuterJoin() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b\n        |FROM MyTable t1 FULL OUTER JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testOuterJoinOpt() {
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.b\n        |FROM MyTable t1 FULL OUTER JOIN MyTable2 t2 ON\n        |  t1.a = t2.a AND\n        |  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR\n        |  WHERE t1.b LIKE t2.b\n      ")).stripMargin();
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testJoinTimeBoundary() {
        this.verifyTimeBoundary("t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR", -3600000L, 3600000L, "proctime");
        this.verifyTimeBoundary("t1.proctime > t2.proctime - INTERVAL '1' SECOND AND t1.proctime < t2.proctime + INTERVAL '1' SECOND", -999L, 999L, "proctime");
        this.verifyTimeBoundary("t1.rowtime >= t2.rowtime - INTERVAL '1' SECOND AND t1.rowtime <= t2.rowtime + INTERVAL '1' SECOND", -1000L, 1000L, "rowtime");
        this.verifyTimeBoundary("t1.rowtime >= t2.rowtime AND t1.rowtime <= t2.rowtime + INTERVAL '1' SECOND", 0L, 1000L, "rowtime");
        this.verifyTimeBoundary("t1.rowtime >= t2.rowtime + INTERVAL '1' SECOND AND t1.rowtime <= t2.rowtime + INTERVAL '10' SECOND", 1000L, 10000L, "rowtime");
        this.verifyTimeBoundary("t2.rowtime - INTERVAL '1' SECOND <= t1.rowtime AND t2.rowtime + INTERVAL '10' SECOND >= t1.rowtime", -1000L, 10000L, "rowtime");
        this.verifyTimeBoundary("t1.rowtime - INTERVAL '2' SECOND >= t2.rowtime + INTERVAL '1' SECOND - INTERVAL '10' SECOND AND t1.rowtime <= t2.rowtime + INTERVAL '10' SECOND", -7000L, 10000L, "rowtime");
        this.verifyTimeBoundary("t1.rowtime >= t2.rowtime - INTERVAL '10' SECOND AND t1.rowtime <= t2.rowtime - INTERVAL '5' SECOND", -10000L, -5000L, "rowtime");
    }

    @Test
    public void testJoinRemainConditionConvert() {
        this.util().addDataStream("MyTable3", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime(), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime()}), new CaseClassTypeInfo<Tuple3<Object, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, Object>>(this, fieldSerializers){

                    public Tuple3<Object, Object, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest$$anon$5 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().addDataStream("MyTable4", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime(), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime()}), new CaseClassTypeInfo<Tuple3<Object, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$7 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$4[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, Object>>(this, fieldSerializers){

                    public Tuple3<Object, Object, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$4(org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest$$anon$7 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.c FROM MyTable3 AS t1 JOIN MyTable4 AS t2 ON\n        |    t1.a = t2.a AND\n        |    t1.rowtime >= t2.rowtime - INTERVAL '10' SECOND AND\n        |    t1.rowtime <= t2.rowtime - INTERVAL '5' SECOND AND\n        |    t1.c > t2.c\n      ")).stripMargin();
        this.verifyRemainConditionConvert(query, ">($2, $6)");
        String query1 = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.c FROM MyTable3 as t1 JOIN MyTable4 AS t2 ON\n        |    t1.a = t2.a AND\n        |    t1.rowtime >= t2.rowtime - INTERVAL '10' SECOND AND\n        |    t1.rowtime <= t2.rowtime - INTERVAL '5' SECOND\n      ")).stripMargin();
        this.verifyRemainConditionConvert(query1, "");
        this.util().addDataStream("MyTable5", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime()}), new CaseClassTypeInfo<Tuple3<Object, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$9 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$5[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, Object>>(this, fieldSerializers){

                    public Tuple3<Object, Object, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$5(org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest$$anon$9 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().addDataStream("MyTable6", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime()}), new CaseClassTypeInfo<Tuple3<Object, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$11 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$6[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, Object>>(this, fieldSerializers){

                    public Tuple3<Object, Object, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[1])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$6(org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest$$anon$11 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        String query2 = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t2.c FROM MyTable5 AS t1 JOIN MyTable6 AS t2 ON\n        |    t1.a = t2.a AND\n        |    t1.proctime >= t2.proctime - INTERVAL '10' SECOND AND\n        |    t1.proctime <= t2.proctime - INTERVAL '5' SECOND AND\n        |    t1.c > t2.c\n      ")).stripMargin();
        this.verifyRemainConditionConvert(query2, ">($2, $6)");
    }

    @Test
    public void testFallbackToRegularJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a FROM MyTable t1 WHERE t1.a IN (\n        | SELECT t2.a FROM MyTable2 t2\n        |   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE\n        |   GROUP BY t2.a\n        |)\n    ")).stripMargin();
        this.util().verifyExecPlan(sql);
    }

    private void verifyTimeBoundary(String timeConditionSql, long expLeftSize, long expRightSize, String expTimeType) {
        String query = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(122).append("\n         |SELECT t1.a, t2.b FROM MyTable AS t1 JOIN MyTable2 AS t2 ON\n         |    t1.a = t2.a AND\n         |    ").append(timeConditionSql).append("\n      ").toString())).stripMargin();
        Table table = this.util().tableEnv().sqlQuery(query);
        RelNode relNode = TableTestUtil$.MODULE$.toRelNode(table);
        LogicalJoin joinNode = (LogicalJoin)relNode.getInput(0);
        RexNode rexNode = joinNode.getCondition();
        Tuple2 tuple2 = IntervalJoinUtil$.MODULE$.extractWindowBoundsFromPredicate(rexNode, joinNode.getLeft().getRowType().getFieldCount(), joinNode.getRowType(), joinNode.getCluster().getRexBuilder(), this.util().tableEnv().getConfig(), Thread.currentThread().getContextClassLoader());
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        Option windowBounds = (Option)tuple2._1();
        Option windowBounds2 = windowBounds;
        String timeTypeStr = ((IntervalJoinSpec.WindowBounds)windowBounds2.get()).isEventTime() ? "rowtime" : "proctime";
        Assertions.assertThat((long)((IntervalJoinSpec.WindowBounds)windowBounds2.get()).getLeftLowerBound()).isEqualTo(expLeftSize);
        Assertions.assertThat((long)((IntervalJoinSpec.WindowBounds)windowBounds2.get()).getLeftUpperBound()).isEqualTo(expRightSize);
        Assertions.assertThat((String)timeTypeStr).isEqualTo(expTimeType);
    }

    private void verifyRemainConditionConvert(String sqlQuery, String expectConditionStr) {
        Table table = this.util().tableEnv().sqlQuery(sqlQuery);
        RelNode relNode = TableTestUtil$.MODULE$.toRelNode(table);
        LogicalJoin joinNode = (LogicalJoin)relNode.getInput(0);
        JoinInfo joinInfo = joinNode.analyzeCondition();
        RexNode rexNode = joinInfo.getRemaining(joinNode.getCluster().getRexBuilder());
        Tuple2 tuple2 = IntervalJoinUtil$.MODULE$.extractWindowBoundsFromPredicate(rexNode, joinNode.getLeft().getRowType().getFieldCount(), joinNode.getRowType(), joinNode.getCluster().getRexBuilder(), this.util().tableEnv().getConfig(), Thread.currentThread().getContextClassLoader());
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        Option remainCondition = (Option)tuple2._2();
        Option remainCondition2 = remainCondition;
        String actual = remainCondition2.getOrElse((Function0 & Serializable & scala.Serializable)() -> "").toString();
        Assertions.assertThat((String)actual).isEqualTo(expectConditionStr);
    }

    public IntervalJoinTest() {
        this.util().addDataStream("MyTable", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$1 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest$$anon$1 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().addDataStream("MyTable2", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "proctime")).proctime(), (Expression)package$.MODULE$.UnresolvedFieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "rowtime")).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$3 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(SerializerConfig serializerConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(serializerConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.stream.sql.join.IntervalJoinTest$$anon$3 org.apache.flink.api.common.typeutils.TypeSerializer[] org.apache.flink.api.common.serialization.SerializerConfig int )}, serializedLambda);
            }
        });
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                              |CREATE TABLE MyTable3 (\n                              |  a int,\n                              |  b bigint,\n                              |  c string,\n                              |  rowtime as TO_TIMESTAMP_LTZ(b, 3),\n                              |  watermark for rowtime as rowtime\n                              |) WITH (\n                              |  'connector' = 'values',\n                              |  'bounded' = 'false'\n                              |)\n       ")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                              |CREATE TABLE MyTable4 (\n                              |  a int,\n                              |  b bigint,\n                              |  c string,\n                              |  rowtime as TO_TIMESTAMP_LTZ(b, 3),\n                              |  watermark for rowtime as rowtime\n                              |) WITH (\n                              |  'connector' = 'values',\n                              |  'bounded' = 'false'\n                              |)\n       ")).stripMargin());
    }
}

