/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.logical;

import java.time.ZoneId;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.hep.HepMatchOrder;
import org.apache.calcite.rel.rules.CoreRules;
import org.apache.calcite.tools.RuleSets;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalCalc;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalWatermarkAssigner;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgramBuilder;
import org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgramBuilder;
import org.apache.flink.table.planner.plan.optimize.program.HEP_RULES_EXECUTION_TYPE;
import org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
import org.apache.flink.table.planner.plan.rules.logical.FlinkCalcMergeRule;
import org.apache.flink.table.planner.plan.rules.logical.PushFilterInCalcIntoTableSourceScanRule;
import org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoTableSourceScanRuleTestBase;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class PushFilterInCalcIntoTableSourceRuleTest
extends PushFilterIntoTableSourceScanRuleTestBase {
    PushFilterInCalcIntoTableSourceRuleTest() {
    }

    @BeforeEach
    void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        FlinkChainedProgram program = new FlinkChainedProgram();
        program.addLast("Converters", (FlinkOptimizeProgram)FlinkVolcanoProgramBuilder.newBuilder().add(RuleSets.ofList((RelOptRule[])new RelOptRule[]{CoreRules.PROJECT_TO_CALC, CoreRules.FILTER_TO_CALC, FlinkCalcMergeRule.INSTANCE, FlinkLogicalCalc.CONVERTER(), FlinkLogicalTableSourceScan.CONVERTER(), FlinkLogicalWatermarkAssigner.CONVERTER()})).setRequiredOutputTraits((RelTrait[])new Convention[]{FlinkConventions.LOGICAL()}).build());
        program.addLast("Filter push in calc down", (FlinkOptimizeProgram)FlinkHepRuleSetProgramBuilder.newBuilder().setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE()).setHepMatchOrder(HepMatchOrder.BOTTOM_UP).add(RuleSets.ofList((RelOptRule[])new RelOptRule[]{PushFilterInCalcIntoTableSourceScanRule.INSTANCE})).build());
        ((StreamTableTestUtil)this.util).replaceStreamProgram((FlinkChainedProgram<StreamOptimizeContext>)program);
        String ddl1 = "CREATE TABLE MyTable (\n  name STRING,\n  id bigint,\n  amount int,\n  price double\n) WITH (\n 'connector' = 'values',\n 'filterable-fields' = 'amount',\n 'bounded' = 'true'\n)";
        this.util.tableEnv().executeSql(ddl1);
        String ddl2 = "CREATE TABLE VirtualTable (\n  name STRING,\n  id bigint,\n  amount int,\n  virtualField as amount + 1,\n  price double\n) WITH (\n 'connector' = 'values',\n 'filterable-fields' = 'amount',\n 'bounded' = 'true'\n)";
        this.util.tableEnv().executeSql(ddl2);
        String ddl3 = "CREATE TABLE WithWatermark (  name STRING,\n  event_time TIMESTAMP(3),\n  WATERMARK FOR event_time as event_time - INTERVAL '5' SECOND) WITH (\n 'connector' = 'values',\n 'bounded' = 'false',\n 'filterable-fields' = 'name',\n 'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl3);
    }

    @Test
    void testFailureToPushFilterIntoSourceWithoutWatermarkPushdown() {
        this.util.verifyRelPlan("SELECT * FROM WithWatermark WHERE LOWER(name) = 'foo'");
    }

    @Override
    @Test
    void testLowerUpperPushdown() {
        String ddl = "CREATE TABLE MTable (\n  a STRING,\n  b STRING\n) WITH (\n 'connector' = 'values',\n 'filterable-fields' = 'a;b',\n 'bounded' = 'true'\n)";
        this.util.tableEnv().executeSql(ddl);
        super.testLowerUpperPushdown();
    }

    @Override
    @Test
    void testWithInterval() {
        String ddl = "CREATE TABLE MTable (\na TIMESTAMP(3),\nb TIMESTAMP(3)\n) WITH (\n 'connector' = 'values',\n 'bounded' = 'false',\n 'filterable-fields' = 'a;b',\n 'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        super.testWithInterval();
    }

    @Override
    @Test
    public void testWithTimestampWithTimeZone() {
        String ddl = "CREATE TABLE MTable (\na TIMESTAMP_LTZ(3),\nb TIMESTAMP(3)\n) WITH (\n 'connector' = 'values',\n 'bounded' = 'false',\n 'filterable-fields' = 'a',\n 'disable-lookup' = 'true')";
        this.util.tableEnv().executeSql(ddl);
        ZoneId preZoneId = this.util.tableEnv().getConfig().getLocalTimeZone();
        this.util.tableEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
        try {
            super.testWithTimestampWithTimeZone();
        }
        finally {
            this.util.tableEnv().getConfig().setLocalTimeZone(preZoneId);
        }
    }
}

