/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql.join;

import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001\u0005]d\u0001\u0002\u0011\"\u0001QBQa\u000f\u0001\u0005\u0002qBqa\u0010\u0001C\u0002\u0013\u0005\u0001\t\u0003\u0004E\u0001\u0001\u0006I!\u0011\u0005\u0006\u000b\u0002!\tA\u0012\u0005\u00061\u0002!\tA\u0012\u0005\u0006;\u0002!\tA\u0012\u0005\u0006?\u0002!\tA\u0012\u0005\u0006C\u0002!\tA\u0012\u0005\u0006G\u0002!\tA\u0012\u0005\u0006K\u0002!\tA\u0012\u0005\u0006O\u0002!\tA\u0012\u0005\u0006S\u0002!\tA\u0012\u0005\u0006W\u0002!\tA\u0012\u0005\u0006[\u0002!\tA\u0012\u0005\u0006_\u0002!\tA\u0012\u0005\u0006c\u0002!\tA\u0012\u0005\u0006g\u0002!\tA\u0012\u0005\u0006k\u0002!\tA\u0012\u0005\u0006o\u0002!\tA\u0012\u0005\u0006s\u0002!\tA\u0012\u0005\u0006w\u0002!\tA\u0012\u0005\u0006{\u0002!\tA\u0012\u0005\u0006\u007f\u0002!\tA\u0012\u0005\u0007\u0003\u0007\u0001A\u0011\u0001$\t\r\u0005\u001d\u0001\u0001\"\u0001G\u0011\u0019\tY\u0001\u0001C\u0001\r\"1\u0011q\u0002\u0001\u0005\u0002\u0019Ca!a\u0005\u0001\t\u00031\u0005bBA\f\u0001\u0011%\u0011\u0011\u0004\u0005\n\u0003K\u0002\u0011\u0013!C\u0005\u0003OBq!!\u001d\u0001\t\u0013\t\u0019H\u0001\tUK6\u0004xN]1m\u0015>Lg\u000eV3ti*\u0011!eI\u0001\u0005U>LgN\u0003\u0002%K\u0005\u00191/\u001d7\u000b\u0005\u0019:\u0013AB:ue\u0016\fWN\u0003\u0002)S\u0005!\u0001\u000f\\1o\u0015\tQ3&A\u0004qY\u0006tg.\u001a:\u000b\u00051j\u0013!\u0002;bE2,'B\u0001\u00180\u0003\u00151G.\u001b8l\u0015\t\u0001\u0014'\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002e\u0005\u0019qN]4\u0004\u0001M\u0011\u0001!\u000e\t\u0003mej\u0011a\u000e\u0006\u0003q%\nQ!\u001e;jYNL!AO\u001c\u0003\u001bQ\u000b'\r\\3UKN$()Y:f\u0003\u0019a\u0014N\\5u}Q\tQ\b\u0005\u0002?\u00015\t\u0011%\u0001\u0003vi&dW#A!\u0011\u0005Y\u0012\u0015BA\"8\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0003\u0015)H/\u001b7!\u0003\u0019\u0011WMZ8sKR\tq\t\u0005\u0002I\u00176\t\u0011JC\u0001K\u0003\u0015\u00198-\u00197b\u0013\ta\u0015J\u0001\u0003V]&$\bF\u0001\u0003O!\tye+D\u0001Q\u0015\t\t&+A\u0002ba&T!a\u0015+\u0002\u000f),\b/\u001b;fe*\u0011Q+M\u0001\u0006UVt\u0017\u000e^\u0005\u0003/B\u0013!BQ3g_J,W)Y2i\u0003\u001d\"Xm\u001d;Fm\u0016tG\u000fV5nKR+W\u000e]8sC2Tu.\u001b8P]2+w-Y2z'>,(oY3)\u0005\u0015Q\u0006CA(\\\u0013\ta\u0006K\u0001\u0003UKN$\u0018A\n;fgR\u0004&o\\2US6,G+Z7q_J\fGNS8j]>sG*Z4bGf\u001cv.\u001e:dK\"\u0012aAW\u0001\u001ai\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>Lg\u000e\u000b\u0002\b5\u0006qC/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o\u001f:$\u0016.\\3ti\u0006l\u0007\u000f\u0014;{%><H/[7fQ\tA!,A\u0011uKN$XI^3oiRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i-&,w\u000f\u000b\u0002\n5\u00061D/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V-\u001c9pe\u0006d'j\\5o/&$\bNV5fo^KG\u000f[\"p]N$\u0018M\u001c;D_:$\u0017\u000e^5p]\"\u0012!BW\u00017i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>LgnV5uQZKWm^,ji\"4UO\\2uS>t7i\u001c8eSRLwN\u001c\u0015\u0003\u0017i\u000b\u0001\u0006^3ti\u00163XM\u001c;US6,G+Z7q_J\fGNS8j]^KG\u000f\u001b,jK^tuN\\#rk&D#\u0001\u0004.\u0002_Q,7\u000f^#wK:$H+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"4\u0016.Z<XSRD\u0007K]3eS\u000e\fG/Z:)\u00055Q\u0016a\r;fgR,e/\u001a8u)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8XSRDg+[3x/&$\b\u000e\u0015:fI&\u001c\u0017\r^3tQ\tq!,A\u0014uKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5MCN$(k\\<WS\u0016<\bFA\b[\u0003%\"Xm\u001d;Qe>\u001cG+[7f)\u0016l\u0007o\u001c:bY*{\u0017N\\,ji\"d\u0015m\u001d;WC2,XMV5fo\"\u0012\u0001CW\u0001(i\u0016\u001cH\u000f\u0015:pGRKW.\u001a+f[B|'/\u00197K_&tw+\u001b;i-&,wOT8o\u000bF,\u0018\u000e\u000b\u0002\u00125\u0006qC/Z:u!J|7\rV5nKR+W\u000e]8sC2Tu.\u001b8XSRDg+[3x/&$\b\u000e\u0015:fI&\u001c\u0017\r^3tQ\t\u0011\",A\u001buKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5D_6\u0004X\u000f^3e\u0007>dW/\u001c8B]\u0012\u0004Vo\u001d5E_^t\u0007FA\n[\u0003Y\"Xm\u001d;Fm\u0016tG\u000fV5nKR+W\u000e]8sC2Tu.\u001b8XSRD7i\\7qkR,GmQ8mk6t\u0017I\u001c3QkNDGi\\<oQ\t!\",\u0001\u0015uKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5CS:dwnZ*pkJ\u001cW\r\u000b\u0002\u00165\u0006)D/Z:u!J|7\rV5nKR+W\u000e]8sC2Tu.\u001b8XSRDg+[3x/&$\bnQ8ogR\fg\u000e^\"p]\u0012LG/[8oQ\t1\",A\u001duKN$\bK]8d)&lW\rT3giR+W\u000e]8sC2Tu.\u001b8XSRDg+[3x/&$\bnQ8ogR\fg\u000e^\"p]\u0012LG/[8oQ\t9\",A\u001buKN$\bK]8d)&lW\rV3na>\u0014\u0018\r\u001c&pS:<\u0016\u000e\u001e5WS\u0016<x+\u001b;i\rVt7\r^5p]\u000e{g\u000eZ5uS>t\u0007F\u0001\r[\u0003m!Xm\u001d;J]Z\fG.\u001b3UK6\u0004xN]1m)\u0006\u0014GNS8j]\"\u0012\u0011DW\u0001&i\u0016\u001cH/\u0012<f]R$\u0016.\\3UK6\u0004xN]1m\u0015>Lg\u000eV8TS:\\w+\u001b;i!.D#A\u0007.\u0002UQ,7\u000f\u001e+f[B|'/\u00197K_&tW\u000b]:feR\u001cv.\u001e:dK^KG\u000f\u001b)pgR4\u0015\u000e\u001c;fe\"\u00121DW\u0001*i\u0016\u001cH\u000fV3na>\u0014\u0018\r\u001c&pS:,\u0006o]3siN{WO]2f/&$\b\u000e\u0015:f\r&dG/\u001a:)\u0005qQ\u0016!F3ya\u0016\u001cG/\u0012=dKB$\u0018n\u001c8UQJ|wO\u001c\u000b\b\u000f\u0006m\u00111GA\u001c\u0011\u0019!S\u00041\u0001\u0002\u001eA!\u0011qDA\u0017\u001d\u0011\t\t#!\u000b\u0011\u0007\u0005\r\u0012*\u0004\u0002\u0002&)\u0019\u0011qE\u001a\u0002\rq\u0012xn\u001c;?\u0013\r\tY#S\u0001\u0007!J,G-\u001a4\n\t\u0005=\u0012\u0011\u0007\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005-\u0012\nC\u0004\u00026u\u0001\r!!\b\u0002\u0011-,\u0017p^8sIND\u0011\"!\u000f\u001e!\u0003\u0005\r!a\u000f\u0002\u000b\rd\u0017M\u001f>1\t\u0005u\u0012q\t\t\u0007\u0003?\ty$a\u0011\n\t\u0005\u0005\u0013\u0011\u0007\u0002\u0006\u00072\f7o\u001d\t\u0005\u0003\u000b\n9\u0005\u0004\u0001\u0005\u0019\u0005%\u0013qGA\u0001\u0002\u0003\u0015\t!a\u0013\u0003\u0007}#\u0013'\u0005\u0003\u0002N\u0005M\u0003c\u0001%\u0002P%\u0019\u0011\u0011K%\u0003\u000f9{G\u000f[5oOB!\u0011QKA0\u001d\u0011\t9&a\u0017\u000f\t\u0005\r\u0012\u0011L\u0005\u0002\u0015&\u0019\u0011QL%\u0002\u000fA\f7m[1hK&!\u0011\u0011MA2\u0005%!\u0006N]8xC\ndWMC\u0002\u0002^%\u000bq$\u001a=qK\u000e$X\t_2faRLwN\u001c+ie><h\u000e\n3fM\u0006,H\u000e\u001e\u00134+\t\tI\u0007\r\u0003\u0002l\u0005=\u0004CBA\u0010\u0003\u007f\ti\u0007\u0005\u0003\u0002F\u0005=DaCA%=\u0005\u0005\t\u0011!B\u0001\u0003\u0017\n\u0001D^3sS\u001aLHK]1og2\fG/[8o'V\u001c7-Z:t)\r9\u0015Q\u000f\u0005\u0007I}\u0001\r!!\b")
public class TemporalJoinTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    public StreamTableTestUtil util() {
        return this.util;
    }

    @BeforeEach
    public void before() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE Orders (\n                    | amount INT,\n                    | currency STRING,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistory (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistoryWithPK (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesBinlogWithComputedColumn (\n                    | currency STRING,\n                    | rate INT,\n                    | rate1 AS rate + 1,\n                    | proctime AS PROCTIME(),\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UB,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesBinlogWithoutWatermark (\n                    | currency STRING,\n                    | rate INT,\n                    | rate1 AS rate + 1,\n                    | proctime AS PROCTIME(),\n                    | rowtime TIMESTAMP(3),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UB,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE UpsertRates (\n                    | currency STRING,\n                    | rate INT,\n                    | valid VARCHAR,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'changelog-mode' = 'I,UA,D',\n                    | 'disable-lookup' = 'true'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesOnly (\n                    | currency STRING,\n                    | rate INT,\n                    | proctime AS PROCTIME()\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE RatesHistoryLegacy (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime,\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'COLLECTION',\n                    | 'is-bounded' = 'false'\n                    |)\n      ")).stripMargin());
        this.util().addTable(" CREATE VIEW rates_last_row_rowtime AS SELECT currency, rate, rowtime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY rowtime DESC) AS rowNum    FROM RatesHistory  ) T   WHERE rowNum = 1");
        this.util().addTable(" CREATE VIEW rates_last_row_proctime AS SELECT T.currency, T.rate, T.proctime FROM   (SELECT *,           ROW_NUMBER() OVER (PARTITION BY currency ORDER BY proctime DESC) AS rowNum    FROM RatesOnly  ) T   WHERE T.rowNum = 1");
        this.util().addTable("CREATE VIEW rates_last_value AS SELECT currency, LAST_VALUE(rate) AS rate FROM RatesHistory GROUP BY currency ");
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE OrdersLtz (\n                                | amount INT,\n                                | currency STRING,\n                                | ts BIGINT,\n                                | rowtime AS TO_TIMESTAMP_LTZ(ts, 3),\n                                | WATERMARK FOR rowtime AS rowtime\n                                |) WITH (\n                                | 'connector' = 'values'\n                                |)\n      ")).stripMargin());
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TABLE RatesLtz (\n                                | currency STRING,\n                                | rate INT,\n                                | ts BIGINT,\n                                | rowtime as TO_TIMESTAMP_LTZ(ts, 3),\n                                | WATERMARK FOR rowtime AS rowtime,\n                                | PRIMARY KEY(currency) NOT ENFORCED\n                                |) WITH (\n                                | 'connector' = 'values'\n                                |)\n      ")).stripMargin());
    }

    @Test
    public void testEventTimeTemporalJoinOnLegacySource() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryLegacy FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinOnLegacySource() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryLegacy FOR SYSTEM_TIME AS OF o.proctime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoin() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinOnTimestampLtzRowtime() {
        String sqlQuery = "SELECT * FROM OrdersLtz AS o JOIN RatesLtz FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithFunctionCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND 'RMB-100' = concat('RMB-', cast(r.rate AS STRING))";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewNonEqui() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND o.amount > r.rate";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeLeftTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o LEFT JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency AND amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithLastRowView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_proctime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithLastValueView() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewNonEqui() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > r.rate";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithPredicates() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_value FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithComputedColumnAndPushDown() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithComputedColumn FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testEventTimeTemporalJoinWithComputedColumnAndPushDown() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithComputedColumn FOR SYSTEM_TIME AS OF o.rowtime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithBinlogSource() {
        String sqlQuery = "SELECT o.currency, r.currency, rate1 FROM Orders AS o JOIN RatesBinlogWithoutWatermark FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND o.amount > 10 AND r.rate < 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeLeftTemporalJoinWithViewWithConstantCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o LEFT JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND r.rate + 1 = 100";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testProcTimeTemporalJoinWithViewWithFunctionCondition() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN rates_last_row_rowtime FOR SYSTEM_TIME AS OF o.proctime AS r on o.currency = r.currency AND 'RMB-100' = concat('RMB-', cast(r.rate AS STRING))";
        this.util().verifyExecPlan(sqlQuery);
    }

    @Test
    public void testInvalidTemporalTablJoin() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE leftTableWithoutTimeAttribute (\n                    | amount INT,\n                    | currency STRING,\n                    | ts TIMESTAMP(3)\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery1 = "SELECT * FROM leftTableWithoutTimeAttribute AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.ts AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery1, new StringBuilder(101).append("Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF'").append(" left table's time attribute field").toString(), ValidationException.class);
        String sqlQuery2 = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.amount = r.rate";
        this.expectExceptionThrown(sqlQuery2, "Temporal table's primary key [currency0] must be included in the equivalence condition of temporal join, but current temporal join condition is [amount=rate].", ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutPk (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | WATERMARK FOR rowtime AS rowtime\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery3 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutPk FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery3, "Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:\nFlinkLogicalJoin(condition=[AND(=($1, $4), __INITIAL_TEMPORAL_JOIN_CONDITION($2, $6, __TEMPORAL_JOIN_LEFT_KEY($1), __TEMPORAL_JOIN_RIGHT_KEY($4)))], joinType=[inner])", ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutTimeAttribute (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery4 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutTimeAttribute FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery4, new StringBuilder(139).append("Event-Time Temporal Table Join requires both primary key and row time attribute in ").append("versioned table, but no row time attribute can be found.").toString(), ValidationException.class);
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE versionedTableWithoutRowtime (\n                    | currency STRING,\n                    | rate INT,\n                    | rowtime TIMESTAMP(3),\n                    | proctime AS PROCTIME(),\n                    | PRIMARY KEY(currency) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values'\n                    |)\n      ")).stripMargin());
        String sqlQuery5 = "SELECT * FROM Orders AS o JOIN versionedTableWithoutRowtime FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery5, new StringBuilder(139).append("Event-Time Temporal Table Join requires both primary key and row time attribute in ").append("versioned table, but no row time attribute can be found.").toString(), ValidationException.class);
        String sqlQuery8 = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT *\n         | FROM OrdersLtz AS o JOIN\n         | RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime AS r\n         | ON o.currency = r.currency\n          ")).stripMargin();
        this.expectExceptionThrown(sqlQuery8, "Event-Time Temporal Table Join requires same rowtime type in left table and versioned table, but the rowtime types are TIMESTAMP_LTZ(3) *ROWTIME* and TIMESTAMP(3) *ROWTIME*.", ValidationException.class);
        String sqlQuery9 = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF 'o.rowtime' AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery9, "The system time period specification expects Timestamp type but is 'CHAR'", ValidationException.class);
        String sqlQuery10 = "SELECT * FROM Orders AS o JOIN RatesHistoryWithPK FOR SYSTEM_TIME AS OF o.rowtime + INTERVAL '1' SECOND AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery10, "Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field'", ValidationException.class);
    }

    @Test
    public void testEventTimeTemporalJoinToSinkWithPk() {
        TableEnvironment tEnv = this.util().tableEnv();
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE orders_rowtime (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  amount BIGINT,\n                       |  order_time TIMESTAMP(3),\n                       |  WATERMARK FOR order_time AS order_time,\n                       |  PRIMARY KEY (order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = 'rowTimeOrderDataId'\n                       |)\n                       |")).stripMargin());
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE versioned_currency_with_multi_key (\n                       |  currency STRING,\n                       |  currency_no STRING,\n                       |  rate  BIGINT,\n                       |  currency_time TIMESTAMP(3),\n                       |  WATERMARK FOR currency_time AS currency_time - interval '10' SECOND,\n                       |  PRIMARY KEY(currency, currency_no) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'changelog-mode' = 'I,UA,UB,D',\n                       |  'data-id' = 'rowTimeCurrencyDataId'\n                       |)\n                       |")).stripMargin());
        tEnv.executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                       |CREATE TABLE rowtime_default_sink (\n                       |  order_id BIGINT,\n                       |  currency STRING,\n                       |  amount BIGINT,\n                       |  l_time TIMESTAMP(3),\n                       |  rate BIGINT,\n                       |  r_time TIMESTAMP(3),\n                       |  PRIMARY KEY(order_id) NOT ENFORCED\n                       |) WITH (\n                       |  'connector' = 'values',\n                       |  'sink-insert-only' = 'false',\n                       |  'changelog-mode' = 'I,UA,UB,D'\n                       |)\n                       |")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO rowtime_default_sink\n        |  SELECT o.order_id, o.currency, o.amount, o.order_time, r.rate, r.currency_time\n        |  FROM orders_rowtime AS o JOIN versioned_currency_with_multi_key\n        |    FOR SYSTEM_TIME AS OF o.order_time as r\n        |      ON o.currency_no = r.currency_no AND o.currency = r.currency\n        |")).stripMargin();
        this.util().verifyExplainInsert(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinUpsertSourceWithPostFilter() {
        String sqlQuery = "SELECT * FROM Orders AS o JOIN UpsertRates FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency WHERE valid = 'true'";
        this.util().verifyExplain(sqlQuery, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testTemporalJoinUpsertSourceWithPreFilter() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY VIEW V1 AS\n                                |SELECT * FROM UpsertRates WHERE valid = 'true'\n                                |")).stripMargin());
        String sqlQuery = "SELECT * FROM Orders AS o JOIN V1 FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency";
        this.expectExceptionThrown(sqlQuery, "Filter is not allowed for right changelog input of event time temporal join, it will corrupt the versioning of data.", TableException.class);
    }

    private void expectExceptionThrown(String sql, String keywords, Class<? extends Throwable> clazz) {
        ThrowableAssert.ThrowingCallable callable = () -> this.verifyTranslationSuccess(sql);
        if (keywords != null) {
            Assertions.assertThatExceptionOfType(clazz).isThrownBy(callable).withMessageContaining(keywords);
            return;
        }
        Assertions.assertThatExceptionOfType(clazz).isThrownBy(callable);
    }

    private Class<? extends Throwable> expectExceptionThrown$default$3() {
        return ValidationException.class;
    }

    private void verifyTranslationSuccess(String sql) {
        this.util().tableEnv().sqlQuery(sql).explain(new ExplainDetail[0]);
    }
}

