/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@ScalaSignature(bytes="\u0006\u0001I3Aa\u0003\u0007\u0001?!)a\u0005\u0001C\u0001O!9!\u0006\u0001b\u0001\n\u0013Y\u0003BB\u0018\u0001A\u0003%A\u0006C\u00031\u0001\u0011\u0005\u0011\u0007C\u0003D\u0001\u0011\u0005\u0011\u0007C\u0003I\u0001\u0011\u0005\u0011\u0007C\u0003K\u0001\u0011\u0005\u0011\u0007C\u0003M\u0001\u0011\u0005\u0011\u0007C\u0003O\u0001\u0011\u0005\u0011\u0007C\u0003Q\u0001\u0011\u0005\u0011G\u0001\u001bXCR,'/\\1sW\u0006\u001b8/[4oKJ\u001c\u0005.\u00198hK2|wMT8s[\u0006d\u0017N_3Ue\u0006t7\u000f]8tKJ+H.\u001a+fgRT!!\u0004\b\u0002\rM$(/Z1n\u0015\ty\u0001#\u0001\u0005qQf\u001c\u0018nY1m\u0015\t\t\"#A\u0003sk2,7O\u0003\u0002\u0014)\u0005!\u0001\u000f\\1o\u0015\t)b#A\u0004qY\u0006tg.\u001a:\u000b\u0005]A\u0012!\u0002;bE2,'BA\r\u001b\u0003\u00151G.\u001b8l\u0015\tYB$\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002;\u0005\u0019qN]4\u0004\u0001M\u0011\u0001\u0001\t\t\u0003C\u0011j\u0011A\t\u0006\u0003GQ\tQ!\u001e;jYNL!!\n\u0012\u0003\u001bQ\u000b'\r\\3UKN$()Y:f\u0003\u0019a\u0014N\\5u}Q\t\u0001\u0006\u0005\u0002*\u00015\tA\"\u0001\u0003vi&dW#\u0001\u0017\u0011\u0005\u0005j\u0013B\u0001\u0018#\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0003\u0015)H/\u001b7!\u0003\u0015\u0019X\r^;q)\u0005\u0011\u0004CA\u001a7\u001b\u0005!$\"A\u001b\u0002\u000bM\u001c\u0017\r\\1\n\u0005]\"$\u0001B+oSRD#\u0001B\u001d\u0011\u0005i\nU\"A\u001e\u000b\u0005qj\u0014aA1qS*\u0011ahP\u0001\bUV\u0004\u0018\u000e^3s\u0015\t\u0001E$A\u0003kk:LG/\u0003\u0002Cw\tQ!)\u001a4pe\u0016,\u0015m\u00195\u0002AQ,7\u000f\u001e)vg\"$wn\u001e8XCR,'/\\1sW^KG\u000f[8vi\u000e\u000bGn\u0019\u0015\u0003\u000b\u0015\u0003\"A\u000f$\n\u0005\u001d[$\u0001\u0002+fgR\fA\u0006^3tiB+8\u000f\u001b3po:\u001c\u0015\r\\2B]\u0012<\u0016\r^3s[\u0006\u00148.Q:tS\u001etWM],ji\"\u001c\u0015\r\\2)\u0005\u0019)\u0015!\n;fgR\u0004Vo\u001d5e_^tw+\u0019;fe6\f'o[!tg&<g.\u001a:XSRD7)\u00197dQ\t9Q)A\u0018uKN$\b+^:iI><hNT3x\u0007\u0006d7-\u00118e/\u0006$XM]7be.\f5o]5h]\u0016\u0014x+\u001b;i\u0007\u0006d7\r\u000b\u0002\t\u000b\u0006aB/Z:u\u000fJ|W\u000f]&fs&\u001b8i\\7qkR,GmQ8mk6t\u0007FA\u0005F\u00039\"Xm\u001d;QkNDGm\\<o\u0007\u0006d7MT8u\u0003\u001a4Wm\u0019;DQ\u0006tw-\u001a7pO:{'/\\1mSj,7*Z=)\u0005))\u0005")
public class WatermarkAssignerChangelogNormalizeTransposeRuleTest
extends TableTestBase {
    private final StreamTableTestUtil util = this.streamTestUtil(this.streamTestUtil$default$1());

    private StreamTableTestUtil util() {
        return this.util;
    }

    @BeforeEach
    public void setup() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE simple_src (\n                     |  currency STRING,\n                     |  currency_no STRING,\n                     |  rate  BIGINT,\n                     |  currency_time TIMESTAMP(3),\n                     |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                     |  PRIMARY KEY(currency) NOT ENFORCED\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'changelog-mode' = 'UA,D',\n                     |  'enable-watermark-push-down' = 'true'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE src_with_computed_column (\n                     |  currency STRING,\n                     |  currency_no STRING,\n                     |  rate  BIGINT,\n                     |  c STRING,\n                     |  currency_time as to_timestamp(c),\n                     |  WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                     |  PRIMARY KEY(currency) NOT ENFORCED\n                     |) WITH (\n                     |  'connector' = 'values',\n                     |  'changelog-mode' = 'UA,D',\n                     |  'enable-watermark-push-down' = 'true'\n                     |)\n                     |")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                     |CREATE TABLE src_with_computed_column2 (\n                     | currency int,\n                     | currency2 as currency + 2,\n                     | currency_no STRING,\n                     | rate BIGINT,\n                     | c STRING,\n                     | currency_time as to_timestamp(c),\n                     | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND,\n                     | PRIMARY KEY(currency) NOT ENFORCED\n                     |) WITH (\n                     | 'connector' = 'values',\n                     | 'changelog-mode' = 'UA,D',\n                     | 'enable-watermark-push-down' = 'true'\n                     |)\n                     |")).stripMargin());
    }

    @Test
    public void testPushdownWatermarkWithoutCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  currency,\n        |  COUNT(1) AS cnt,\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM simple_src\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownCalcAndWatermarkAssignerWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  currency,\n        |  COUNT(1) AS cnt,\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM src_with_computed_column\n        |GROUP BY currency, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownWatermarkAssignerWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,\n        |  MAX(rate) AS max_rate\n        |FROM simple_src\n        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownNewCalcAndWatermarkAssignerWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end,\n        |  MAX(rate) AS max_rate\n        |FROM src_with_computed_column\n        |GROUP BY TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testGroupKeyIsComputedColumn() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  currency2,\n        |  COUNT(1) AS cnt,\n        |  TUMBLE_START(currency_time, INTERVAL '5' SECOND) as w_start,\n        |  TUMBLE_END(currency_time, INTERVAL '5' SECOND) as w_end\n        |FROM src_with_computed_column2\n        |GROUP BY currency2, TUMBLE(currency_time, INTERVAL '5' SECOND)\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }

    @Test
    public void testPushdownCalcNotAffectChangelogNormalizeKey() {
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE t1 (\n                    |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',\n                    |  a VARCHAR NOT NULL,\n                    |  b VARCHAR NOT NULL,\n                    |  WATERMARK FOR ingestion_time AS ingestion_time\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'readable-metadata' = 'ts:TIMESTAMP(3)'\n                    |)\n      ")).stripMargin());
        this.util().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                    |CREATE TABLE t2 (\n                    |  k VARBINARY,\n                    |  ingestion_time TIMESTAMP(3) METADATA FROM 'ts',\n                    |  a VARCHAR NOT NULL,\n                    |  f BOOLEAN NOT NULL,\n                    |  WATERMARK FOR `ingestion_time` AS `ingestion_time`,\n                    |  PRIMARY KEY (`a`) NOT ENFORCED\n                    |) WITH (\n                    | 'connector' = 'values',\n                    | 'readable-metadata' = 'ts:TIMESTAMP(3)',\n                    | 'changelog-mode' = 'I,UA,D'\n                    |)\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT t1.a, t1.b, t2.f\n        |FROM t1 INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.ingestion_time\n        | ON t1.a = t2.a WHERE t2.f = true\n        |")).stripMargin();
        this.util().verifyRelPlan(sql, (Seq<ExplainDetail>)Predef$.MODULE$.wrapRefArray((Object[])new ExplainDetail[]{ExplainDetail.CHANGELOG_MODE}));
    }
}

