/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql.join;

import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005-d\u0001B\u0017/\u0001\u0005CQ\u0001\u0013\u0001\u0005\u0002%Cq\u0001\u0014\u0001C\u0002\u0013%Q\n\u0003\u0004R\u0001\u0001\u0006IA\u0014\u0005\u0006%\u0002!\ta\u0015\u0005\u0006K\u0002!\ta\u0015\u0005\u0006O\u0002!\ta\u0015\u0005\u0006S\u0002!\ta\u0015\u0005\u0006W\u0002!\ta\u0015\u0005\u0006[\u0002!\ta\u0015\u0005\u0006_\u0002!\ta\u0015\u0005\u0006c\u0002!\ta\u0015\u0005\u0006g\u0002!\ta\u0015\u0005\u0006k\u0002!\ta\u0015\u0005\u0006o\u0002!\ta\u0015\u0005\u0006s\u0002!\ta\u0015\u0005\u0006w\u0002!\ta\u0015\u0005\u0006{\u0002!\ta\u0015\u0005\u0006\u007f\u0002!\ta\u0015\u0005\u0007\u0003\u0007\u0001A\u0011A*\t\r\u0005\u001d\u0001\u0001\"\u0001T\u0011\u0019\tY\u0001\u0001C\u0001'\"1\u0011q\u0002\u0001\u0005\u0002MCa!a\u0005\u0001\t\u0003\u0019\u0006BBA\f\u0001\u0011\u00051\u000b\u0003\u0004\u0002\u001c\u0001!\ta\u0015\u0005\u0007\u0003?\u0001A\u0011A*\t\r\u0005\r\u0002\u0001\"\u0001T\u0011\u0019\t9\u0003\u0001C\u0001'\"1\u00111\u0006\u0001\u0005\u0002MCa!a\f\u0001\t\u0003\u0019\u0006BBA\u001a\u0001\u0011\u00051\u000b\u0003\u0004\u00028\u0001!\ta\u0015\u0005\u0007\u0003w\u0001A\u0011A*\t\r\u0005}\u0002\u0001\"\u0001T\u0011\u0019\t\u0019\u0005\u0001C\u0001'\"1\u0011q\t\u0001\u0005\u0002MCa!a\u0013\u0001\t\u0003\u0019\u0006BBA(\u0001\u0011\u00051\u000b\u0003\u0004\u0002T\u0001!\ta\u0015\u0005\u0007\u0003/\u0002A\u0011A*\t\r\u0005m\u0003\u0001\"\u0001T\u0011\u0019\ty\u0006\u0001C\u0001'\"1\u00111\r\u0001\u0005\u0002MCa!a\u001a\u0001\t\u0003\u0019&AD,j]\u0012|wOS8j]R+7\u000f\u001e\u0006\u0003_A\nAA[8j]*\u0011\u0011GM\u0001\u0004gFd'BA\u001a5\u0003\u0019\u0019HO]3b[*\u0011QGN\u0001\u0005a2\fgN\u0003\u00028q\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u001d;\u0003\u0015!\u0018M\u00197f\u0015\tYD(A\u0003gY&t7N\u0003\u0002>}\u00051\u0011\r]1dQ\u0016T\u0011aP\u0001\u0004_J<7\u0001A\n\u0003\u0001\t\u0003\"a\u0011$\u000e\u0003\u0011S!!\u0012\u001c\u0002\u000bU$\u0018\u000e\\:\n\u0005\u001d#%!\u0004+bE2,G+Z:u\u0005\u0006\u001cX-\u0001\u0004=S:LGO\u0010\u000b\u0002\u0015B\u00111\nA\u0007\u0002]\u0005!Q\u000f^5m+\u0005q\u0005CA\"P\u0013\t\u0001FIA\nTiJ,\u0017-\u001c+bE2,G+Z:u+RLG.A\u0003vi&d\u0007%\u0001\u001cuKN$8+[7qY&4\u0017\u0010V;nE2,w+\u001b8e_^$fK\u0012\"fM>\u0014XmV5oI><(j\\5o/&$\b\u000eV<p\u0007\u0006d7\rF\u0001U!\t)\u0006,D\u0001W\u0015\u00059\u0016!B:dC2\f\u0017BA-W\u0005\u0011)f.\u001b;)\u0005\u0011Y\u0006C\u0001/d\u001b\u0005i&B\u00010`\u0003\r\t\u0007/\u001b\u0006\u0003A\u0006\fqA[;qSR,'O\u0003\u0002c}\u0005)!.\u001e8ji&\u0011A-\u0018\u0002\u0005)\u0016\u001cH/A\u001cuKN$8+[7qY&4\u0017\u0010V;nE2,w+\u001b8e_^$fK\u0012\"fM>\u0014XmV5oI><(j\\5o/&$\b\u000eT3gi\u000e\u000bGn\u0019\u0015\u0003\u000bm\u000b\u0001\b^3tiNKW\u000e\u001d7jMf$V/\u001c2mK^Kg\u000eZ8x)Z3%)\u001a4pe\u0016<\u0016N\u001c3po*{\u0017N\\,ji\"\u0014\u0016n\u001a5u\u0007\u0006d7\r\u000b\u0002\u00077\u0006YC/Z:u'&l\u0007\u000f\\5gsR+XN\u00197f/&tGm\\<U-\u001a\u0013UMZ8sK^Kg\u000eZ8x\u0015>Lg\u000e\u000b\u0002\b7\u0006\u0001C/Z:u/&tGm\\<K_&tw+\u001b;i_V$\bK]8kK\u000e$\u0018n\u001c8tQ\tA1,A\u0015uKN$XK\\:vaB|'\u000f^3e/&tGm\\<U-\u001a{F+^7cY\u0016|e\u000e\u0015:pGRLW.\u001a\u0015\u0003\u0013m\u000b1\u0007^3tiNKW\u000e\u001d7jMfDu\u000e],j]\u0012|w\u000f\u0016,G\u0005\u00164wN]3XS:$wn\u001e&pS:<\u0016\u000e\u001e5Uo>\u001c\u0015\r\\2)\u0005)Y\u0016\u0001\u000e;fgR\u001c\u0016.\u001c9mS\u001aL\bj\u001c9XS:$wn\u001e+W\r\n+gm\u001c:f/&tGm\\<K_&tw+\u001b;i\u0019\u00164GoQ1mG\"\u00121bW\u00016i\u0016\u001cHoU5na2Lg-\u001f%pa^Kg\u000eZ8x)Z3%)\u001a4pe\u0016<\u0016N\u001c3po*{\u0017N\\,ji\"\u0014\u0016n\u001a5u\u0007\u0006d7\r\u000b\u0002\r7\u0006AC/Z:u'&l\u0007\u000f\\5gs\"{\u0007oV5oI><HK\u0016$CK\u001a|'/Z,j]\u0012|wOS8j]\"\u0012QbW\u0001'i\u0016\u001cH/\u00168tkB\u0004xN\u001d;fI^Kg\u000eZ8x)Z3u\fS8q\u001f:\u0004&o\\2uS6,\u0007F\u0001\b\\\u0003a\"Xm\u001d;TS6\u0004H.\u001b4z\u0007VlW\u000f\\1uK^Kg\u000eZ8x)Z3%)\u001a4pe\u0016<\u0016N\u001c3po*{\u0017N\\,ji\"$vo\\\"bY\u000eD#aD.\u0002sQ,7\u000f^*j[Bd\u0017NZ=Dk6,H.\u0019;f/&tGm\\<U-\u001a\u0013UMZ8sK^Kg\u000eZ8x\u0015>LgnV5uQ2+g\r^\"bY\u000eD#\u0001E.\u0002uQ,7\u000f^*j[Bd\u0017NZ=Dk6,H.\u0019;f/&tGm\\<U-\u001a\u0013UMZ8sK^Kg\u000eZ8x\u0015>LgnV5uQJKw\r\u001b;DC2\u001c\u0007FA\t\\\u00035\"Xm\u001d;TS6\u0004H.\u001b4z\u0007VlW\u000f\\1uK^Kg\u000eZ8x)Z3%)\u001a4pe\u0016<\u0016N\u001c3po*{\u0017N\u001c\u0015\u0003%m\u000b1\u0006^3tiVs7/\u001e9q_J$X\rZ,j]\u0012|w\u000f\u0016,G?\u000e+X.\u001e7bi\u0016|e\u000e\u0015:pGRLW.\u001a\u0015\u0003'm\u000bQ\u0003^3ti:{GoU1nK^Kg\u000eZ8x)f\u0004X\r\u000b\u0002\u00157\u0006)B/Z:u\u001d>$8+Y7f/&tGm\\<Ta\u0016\u001c\u0007FA\u000b\\\u0003q!Xm\u001d;O_R\u001c\u0016-\\3US6,\u0017\t\u001e;sS\n,H/\u001a+za\u0016D#AF.\u0002WQ,7\u000f^'jgN<\u0016N\u001c3po\u0016sG-\u00138D_:$\u0017\u000e^5p]\u001a{'\u000fV;nE2,w+\u001b8e_^D#aF.\u0002[Q,7\u000f^'jgN<\u0016N\u001c3poN#\u0018M\u001d;J]\u000e{g\u000eZ5uS>tgi\u001c:Uk6\u0014G.Z,j]\u0012|w\u000f\u000b\u0002\u00197\u0006AC/Z:u\u001b&\u001c8oV5oI><XI\u001c3J]\u000e{g\u000eZ5uS>tgi\u001c:I_B<\u0016N\u001c3po\"\u0012\u0011dW\u0001+i\u0016\u001cH/T5tg^Kg\u000eZ8x'R\f'\u000f^%o\u0007>tG-\u001b;j_:4uN\u001d%pa^Kg\u000eZ8xQ\tQ2,A\u0017uKN$X*[:t/&tGm\\<F]\u0012LenQ8oI&$\u0018n\u001c8G_J\u001cU/\\;mCR,w+\u001b8e_^D#aG.\u0002_Q,7\u000f^'jgN<\u0016N\u001c3poN#\u0018M\u001d;J]\u000e{g\u000eZ5uS>tgi\u001c:Dk6,H.\u0019;f/&tGm\\<)\u0005qY\u0016a\u0007;fgR|e\u000eV;nE2,w+\u001b8e_^\fum\u001a:fO\u0006$X\r\u000b\u0002\u001e7\u0006)C/Z:u\u001f:$V/\u001c2mK^Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016|e\u000e\u0015:pGRLW.\u001a\u0015\u0003=m\u000b\u0001\u0004^3ti>s\u0007j\u001c9XS:$wn^!hOJ,w-\u0019;fQ\ty2,\u0001\u0012uKN$xJ\u001c%pa^Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016|e\u000e\u0015:pGRLW.\u001a\u0015\u0003Am\u000bQ\u0004^3ti>s7)^7vY\u0006$XmV5oI><\u0018iZ4sK\u001e\fG/\u001a\u0015\u0003Cm\u000bq\u0005^3ti>s7)^7vY\u0006$XmV5oI><\u0018iZ4sK\u001e\fG/Z(o!J|7\r^5nK\"\u0012!eW\u0001\u001ai\u0016\u001cHoV5oI><(j\\5o/&$\bNT8o\u000bF,\u0018\u000e\u000b\u0002$7\u00069C/Z:u)&lW-\u0011;ue&\u0014W\u000f^3Qe>\u0004\u0018mZ1uK\u001a{'oV5oI><(j\\5oQ\t!3,\u0001\u0015uKN$H+[7f\u0003R$(/\u001b2vi\u0016\u0004&o\u001c9bO\u0006$XMR8s/&tGm\\<K_&t\u0017\u0007\u000b\u0002&7\u0006AC/Z:u/&tGm\\<Qe>\u0004XM\u001d;z!J|\u0007/Y4bi\u00164uN],j]\u0012|wOS8j]\"\u0012aeW\u0001\u000fi\u0016\u001cHoU3nS*{\u0017N\\%OQ\t93,A\u0007uKN$8+Z7j\u000bbL7\u000f\u001e\u0015\u0003Qm\u000b\u0011\u0003^3ti\u0006sG/\u001b&pS:tu\u000e^%OQ\tI3,\u0001\u000buKN$\u0018I\u001c;j\u0015>LgNT8u\u000bbL7\u000f\u001e\u0015\u0003Um\u000bQ\u0004^3ti*{\u0017N\\,ji\"L5OT8u\t&\u001cH/\u001b8di\u001a\u0013x.\u001c\u0015\u0003Wm\u000b1\u0003^3ti*{\u0017N\u001c+p\u001bVdG/[*j].D#\u0001L.")
public class WindowJoinTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoinWithTwoCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoinWithLeftCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoinWithRightCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyTumbleWindowTVFBeforeWindowJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testWindowJoinWithoutProjections() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |  TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS L\n        |JOIN\n        |  TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) AS R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_TumbleOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.a, L.b, L.c, R.a, R.b, R.c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExplain(sql)).hasMessageContaining("Processing time Window Join is not supported yet.") instanceof TableException;
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoinWithTwoCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoinWithLeftCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoinWithRightCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyHopWindowTVFBeforeWindowJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_HopOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.a, L.b, L.c, R.a, R.b, R.c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExplain(sql)).hasMessageContaining("Processing time Window Join is not supported yet.") instanceof TableException;
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoinWithTwoCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoinWithLeftCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoinWithRightCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  WHERE c > 10\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSimplifyCumulateWindowTVFBeforeWindowJoin() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedWindowTVF_CumulateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.a, L.b, L.c, R.a, R.b, R.c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        boolean cfr_ignored_0 = Assertions.assertThatThrownBy(() -> this.util().verifyExplain(sql)).hasMessageContaining("Processing time Window Join is not supported yet.") instanceof TableException;
    }

    @Test
    public void testNotSameWindowType() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testNotSameWindowSpec() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '2' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testNotSameTimeAttributeType() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowEndInConditionForTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowStartInConditionForTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowEndInConditionForHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowStartInConditionForHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowEndInConditionForCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMissWindowStartInConditionForCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnTumbleWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnTumbleWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnHopWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnHopWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnCumulateWindowAggregate() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testOnCumulateWindowAggregateOnProctime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testWindowJoinWithNonEqui() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a AND\n        | CAST(L.window_start AS BIGINT) > R.uv\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTimeAttributePropagateForWindowJoin() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyTable3 (\n                                |  a INT,\n                                |  b STRING NOT NULL,\n                                |  c BIGINT,\n                                |  rowtime TIMESTAMP(3),\n                                |  proctime as PROCTIME(),\n                                |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                                |) with (\n                                |  'connector' = 'values'\n                                |)\n                                |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp AS\n        |SELECT\n        |  L.window_time as rowtime,\n        |  L.a as a,\n        |  L.b as l_b,\n        |  L.c as l_c,\n        |  R.b as r_b,\n        |  R.c as r_c\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) L\n        |JOIN (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT tmp.*, MyTable3.* FROM tmp JOIN MyTable3 ON\n        | tmp.a = MyTable3.a AND\n        | tmp.rowtime BETWEEN\n        |   MyTable3.rowtime - INTERVAL '10' SECOND AND\n        |   MyTable3.rowtime + INTERVAL '1' HOUR\n        |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTimeAttributePropagateForWindowJoin1() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE MyTable4 (\n                                |  a INT,\n                                |  b STRING NOT NULL,\n                                |  c BIGINT,\n                                |  rowtime TIMESTAMP(3),\n                                |  proctime as PROCTIME(),\n                                |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                                |) with (\n                                |  'connector' = 'values'\n                                |)\n                                |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp1 AS\n        |SELECT\n        |  L.window_time as rowtime,\n        |  L.a,\n        |  L.cnt as l_cnt,\n        |  L.uv as l_uv,\n        |  R.cnt as r_cnt,\n        |  R.uv as r_uv\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT tmp1.*, MyTable4.* FROM tmp1 JOIN MyTable4 ON\n        | tmp1.a = MyTable4.a AND\n        | tmp1.rowtime BETWEEN\n        |   MyTable4.rowtime - INTERVAL '10' SECOND AND\n        |   MyTable4.rowtime + INTERVAL '1' HOUR\n        |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testWindowPropertyPropagateForWindowJoin() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tmp2 AS\n        |SELECT\n        |  L.window_start as window_start,\n        |  L.window_end as window_end,\n        |  L.a,\n        |  L.cnt as l_cnt,\n        |  L.uv as l_uv,\n        |  R.cnt as r_cnt,\n        |  R.uv as r_uv\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(\n        |    CUMULATE(\n        |      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY l_cnt DESC) as rownum\n        |  FROM tmp2\n        |)\n        |WHERE rownum <= 3\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSemiJoinIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE L.a IN (\n        |SELECT a FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testSemiExist() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE EXISTS (\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testAntiJoinNotIN() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE L.a NOT IN (\n        |SELECT a FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testAntiJoinNotExist() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L WHERE NOT EXISTS (\n        |SELECT * FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |WHERE L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testJoinWithIsNotDistinctFrom() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT L.*, R.*\n        |FROM (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) L\n        |JOIN (\n        |  SELECT\n        |    a,\n        |    window_start,\n        |    window_end,\n        |    window_time,\n        |    count(*) as cnt,\n        |    count(distinct c) AS uv\n        |  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  GROUP BY a, window_start, window_end, window_time\n        |) R\n        |ON L.window_start = R.window_start AND L.window_end = R.window_end AND\n        |L.a IS NOT DISTINCT FROM R.a\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testJoinToMultiSink() {
        String sourceDdl = new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE food_order (\n        | user_id STRING,\n        | order_id STRING,\n        | amount INT,\n        | event_time TIMESTAMP(3),\n        | WATERMARK FOR event_time AS event_time\n        |) WITH (\n        |'connector' = 'values')\n        |")).stripMargin();
        this.util().tableEnv().executeSql(sourceDdl);
        String query = new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TEMPORARY VIEW food_view AS\n        |WITH food AS ( \n        |  SELECT user_id, \n        |         window_start, \n        |         window_end \n        |  FROM TABLE(TUMBLE(TABLE food_order, DESCRIPTOR(event_time), INTERVAL '1' MINUTES)) \n        |  GROUP BY \n        |  user_id,\n        |  window_start,\n        |  window_end)\n        |SELECT food.window_start\n        |     ,food.window_end\n        |     ,food.user_id\n        |     ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'yyyyMMdd') AS dt\n        |     ,DATE_FORMAT(food.window_end + INTERVAL '7' HOUR, 'HH') AS `hour`\n        |FROM food\n        |LEFT JOIN food AS a ON food.user_id = a.user_id\n        |AND food.window_start = a.window_start\n        |AND food.window_end = a.window_end\n        |")).stripMargin();
        this.util().tableEnv().executeSql(query);
        String sinkDdl = new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE %s (\n        | window_start TIMESTAMP(3),\n        | window_end TIMESTAMP(3),\n        | user_id STRING,\n        | dt STRING,\n        | `hour` STRING\n        |) WITH (\n        | 'connector' = 'values')\n        |")).stripMargin();
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(sinkDdl)).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"sink1"})));
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(sinkDdl)).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"sink2"})));
        StatementSet statementSet = this.util().tableEnv().createStatementSet();
        statementSet.addInsertSql("INSERT INTO sink1 SELECT * FROM food_view");
        statementSet.addInsertSql("INSERT INTO sink2 SELECT * FROM food_view");
        this.util().verifyRelPlan(statementSet);
    }

    public WindowJoinTest() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                              |CREATE TABLE MyTable (\n                              |  a INT,\n                              |  b STRING NOT NULL,\n                              |  c BIGINT,\n                              |  rowtime TIMESTAMP(3),\n                              |  proctime as PROCTIME(),\n                              |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                              |) with (\n                              |  'connector' = 'values'\n                              |)\n                              |")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                              |CREATE TABLE MyTable2 (\n                              |  a INT,\n                              |  b STRING NOT NULL,\n                              |  c BIGINT,\n                              |  rowtime TIMESTAMP(3),\n                              |  proctime as PROCTIME(),\n                              |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n                              |) with (\n                              |  'connector' = 'values'\n                              |)\n                              |")).stripMargin());
    }
}

