/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.logical;

import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
import org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class ProjectSnapshotTransposeRuleTest
extends TableTestBase {
    private static final String STREAM = "stream";
    private static final String BATCH = "batch";
    @Parameter
    private String mode;
    private TableTestUtil util;

    ProjectSnapshotTransposeRuleTest() {
    }

    @Parameters(name="mode = {0}")
    private static Collection<String> parameters() {
        return Arrays.asList(STREAM, BATCH);
    }

    @BeforeEach
    void setup() {
        boolean isStreaming = STREAM.equals(this.mode);
        if (isStreaming) {
            this.util = this.streamTestUtil(TableConfig.getDefault());
            ((StreamTableTestUtil)this.util).buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE());
        } else {
            this.util = this.batchTestUtil(TableConfig.getDefault());
            ((BatchTableTestUtil)this.util).buildBatchProgram(FlinkBatchProgram.LOGICAL_REWRITE());
        }
        TableEnvironment tEnv = this.util.getTableEnv();
        String src = String.format("CREATE TABLE MyTable (\n  a int,\n  b varchar,\n  c bigint,\n  proctime as PROCTIME(),\n  rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n  watermark for rowtime as rowtime - INTERVAL '1' second \n) with (\n  'connector' = 'values',\n  'bounded' = '%s')", !isStreaming);
        String lookup = String.format("CREATE TABLE LookupTable (\n  id int,\n  name varchar,\n  age int \n) with (\n  'connector' = 'values',\n  'bounded' = '%s')", !isStreaming);
        tEnv.executeSql(src);
        tEnv.executeSql(lookup);
    }

    @TestTemplate
    void testJoinTemporalTableWithProjectionPushDown() {
        String sql = "SELECT T.*, D.id\nFROM MyTable AS T\nJOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\nON T.a = D.id";
        this.util.verifyRelPlan(sql);
    }

    @TestTemplate
    void testJoinTemporalTableNotProjectable() {
        String sql = "SELECT T.*, D.*\nFROM MyTable AS T\nJOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\nON T.a = D.id";
        this.util.verifyRelPlan(sql);
    }

    @TestTemplate
    void testJoinTemporalTableWithReorderedProject() {
        String sql = "SELECT T.*, D.age, D.name, D.id\nFROM MyTable AS T\nJOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\nON T.a = D.id";
        this.util.verifyRelPlan(sql);
    }

    @TestTemplate
    void testJoinTemporalTableWithProjectAndFilter() {
        String sql = "SELECT T.*, D.id\nFROM MyTable AS T\nJOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\nON T.a = D.id WHERE D.age > 20";
        this.util.verifyRelPlan(sql);
    }
}

