/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.optimize.program;

import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionImpl;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataLong;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.planner.factories.TestValuesCatalog;
import org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class FlinkRuntimeFilterProgramTest
extends TableTestBase {
    public static final long SUITABLE_DIM_ROW_COUNT = 131072L;
    public static final long SUITABLE_FACT_ROW_COUNT = 0x40000000L;
    private final BatchTableTestUtil util = this.batchTestUtil(TableConfig.getDefault());
    private final TestValuesCatalog catalog = new TestValuesCatalog("testCatalog", "test_database", true);

    @BeforeEach
    void setup() {
        this.catalog.open();
        this.util.tableEnv().registerCatalog("testCatalog", (Catalog)this.catalog);
        this.util.tableEnv().useCatalog("testCatalog");
        TableConfig tableConfig = this.util.tableEnv().getConfig();
        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_ENABLED, (Object)true);
        tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_RUNTIME_FILTER_MAX_BUILD_DATA_SIZE, (Object)MemorySize.parse((String)"10m"));
        String dimDdl = "create table dim (\n  id BIGINT,\n  male BOOLEAN,\n  amount BIGINT,\n  price BIGINT,\n  dim_date_sk BIGINT\n)  with (\n 'connector' = 'values',\n 'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)";
        this.util.tableEnv().executeSql(dimDdl);
        String factDdl = "create table fact (\n  id BIGINT,\n  name STRING,\n  amount BIGINT,\n  price BIGINT,\n  fact_date_sk BIGINT\n) with (\n  'connector' = 'values',\n  'runtime-source' = 'NewSource',\n  'bounded' = 'true'\n)";
        this.util.tableEnv().executeSql(factDdl);
    }

    @Test
    void testSimpleInnerJoin() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from fact, dim where fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testSemiJoin() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, (Object)-1L);
        String query = "select * from fact where fact.fact_date_sk in (select dim_date_sk from dim where dim.price < 500)";
        this.util.verifyPlan(query);
    }

    @Test
    void testLeftOuterJoinWithLeftBuild() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from dim left outer join fact on fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testLeftOuterJoinWithRightBuild() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from fact left outer join dim on fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testFullOuterJoin() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from fact full outer join (select * from dim where dim.price < 500) on fact_date_sk = dim_date_sk";
        this.util.verifyPlan(query);
    }

    @Test
    void testAntiJoin() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from fact where fact.fact_date_sk not in (select dim_date_sk from dim where dim.price < 500)";
        this.util.verifyPlan(query);
    }

    @Test
    void testNestedLoopJoin() throws Exception {
        this.setupTableRowCount("dim", 1L);
        this.setupTableRowCount("fact", 0x40000000L);
        String query = "select * from fact, dim where fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testProbeSideIsTooSmall() throws Exception {
        this.setupTableRowCount("dim", 131072L);
        this.setupTableRowCount("fact", 0x8000000L);
        String query = "select * from fact, dim where fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testBuildSideIsTooLarge() throws Exception {
        this.setupTableRowCount("dim", 0x100000L);
        this.setupTableRowCount("fact", 0x40000000L);
        String query = "select * from fact, dim where fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testFilterRatioIsTooSmall() throws Exception {
        this.setupSuitableTableStatistics();
        this.setupTableColumnNdv("dim", "amount", 768L);
        this.setupTableColumnNdv("fact", "amount", 1024L);
        String query = "select * from fact, dim where fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testBuildSideIsJoinWithoutExchange() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.setupTableRowCount("fact2", 0x40000000L);
        String query = "select * from dim, fact, fact2 where fact.amount = fact2.amount and fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testBuildSideIsJoinWithTwoAggInputs() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.setupTableRowCount("fact2", 0x40000000L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, (Object)-1L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.ONE_PHASE);
        String query = "select * from fact join (select * from (select dim_date_sk, sum(dim.price) from dim group by dim_date_sk) agg1 join (select dim_date_sk, sum(dim.amount) from dim group by dim_date_sk) agg2 on agg1.dim_date_sk = agg2.dim_date_sk) as dimSide on fact.fact_date_sk = dimSide.dim_date_sk";
        this.util.verifyPlan(query);
    }

    @Test
    void testBuildSideIsLeftJoinWithoutExchange() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT,\n  fact_date_sk BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.setupTableRowCount("fact2", 0x40000000L);
        String query = "select * from fact2 join (select * from fact left join dim on dim.amount = fact.amount and dim.price < 500) as dimSide on fact2.amount = dimSide.amount";
        this.util.verifyPlan(query);
    }

    @Test
    void testBuildSideIsAggWithoutExchange() throws Exception {
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, (Object)-1L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.ONE_PHASE);
        this.setupSuitableTableStatistics();
        String query = "select * from fact join (select dim_date_sk, sum(dim.price) from dim where  dim.price < 500 group by dim_date_sk) dimSide on fact.fact_date_sk = dimSide.dim_date_sk";
        this.util.verifyPlan(query);
    }

    @Test
    void testBuildSideIsCalcWithoutExchange() throws Exception {
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD, (Object)-1L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.ONE_PHASE);
        this.setupSuitableTableStatistics();
        String query = "select * from fact join (select dim_date_sk, sum(dim.price) + 1 as sum_price from dim where  dim.price < 500 group by dim_date_sk) dimSide on fact.fact_date_sk = dimSide.dim_date_sk";
        this.util.verifyPlan(query);
    }

    @Test
    void testCannotInjectMoreThanOneRuntimeFilterInSamePlace() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table dim2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.setupTableRowCount("dim2", 131072L);
        String query = "select * from fact, dim, dim2 where fact.amount = dim.amount and fact.amount = dim2.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testPushDownProbeSideWithCalc() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from dim, fact where dim.amount = fact.amount and dim.price < 500 and fact.price > 600";
        this.util.verifyPlan(query);
    }

    @Test
    void testCannotPushDownProbeSideWithCalc() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from dim inner join (select fact_date_sk, RAND(10) as random from fact) as factSide on dim.amount = factSide.random and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testPushDownProbeSideToAllInputsOfJoin() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.setupTableRowCount("fact2", 0x40000000L);
        String query = "select * from fact, fact2, dim where fact.amount = fact2.amount and fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testPushDownProbeSideToOneInputOfJoin() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id BIGINT,\n  amount BIGINT,\n  price BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.setupTableRowCount("fact2", 0x40000000L);
        String query = "select * from fact, fact2, dim where fact.price = fact2.price and fact.amount = dim.amount and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testCannotPushDownProbeSideWithJoin() throws Exception {
        this.setupSuitableTableStatistics();
        this.util.tableEnv().executeSql("create table fact2 (\n  id2 BIGINT,\n  amount2 BIGINT,\n  price2 BIGINT\n) with (\n 'connector' = 'values',\n  'runtime-source' = 'NewSource',\n 'bounded' = 'true'\n)");
        this.setupTableRowCount("fact2", 0x40000000L);
        String query = "select * from (select * from fact inner join fact2 on fact.id = fact2.id2) as factSide inner join dim on factSide.amount = dim.amount and factSide.price2 = dim.price and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testPushDownProbeSideWithAgg() throws Exception {
        this.setupTableRowCount("dim", 131072L);
        this.setupTableRowCount("fact", 0x10000000000L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.ONE_PHASE);
        String query = "select * from dim join (select id, fact_date_sk, sum(fact.price) from fact group by (id, fact_date_sk)) factSide on dim.dim_date_sk = factSide.fact_date_sk and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testCannotPushDownProbeSideWithAgg() throws Exception {
        this.setupTableRowCount("dim", 131072L);
        this.setupTableRowCount("fact", 0x10000000000L);
        this.util.getTableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, (Object)AggregatePhaseStrategy.ONE_PHASE);
        String query = "select * from dim join (select id, fact_date_sk, sum(fact.price) as sum_price from fact group by (id, fact_date_sk)) factSide on dim.price = factSide.sum_price and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testPushDownProbeSideWithUnion() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "Select * from (select id, fact_date_sk, amount as amount1 from fact where price < 500 union all select id, fact_date_sk, amount from fact where price > 600) fact2, dim where fact2.fact_date_sk = dim.dim_date_sk and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testDoesNotApplyRuntimeFilterAndDPPOnSameKey() throws Exception {
        this.setupTableRowCount("dim", 131072L);
        this.createPartitionedFactTable(0x40000000L);
        String query = "select * from dim, fact_part where fact_part.fact_date_sk = dim.dim_date_sk and dim.price < 500";
        this.util.verifyPlan(query);
    }

    @Test
    void testProbeSideIsTableSourceWithoutExchange() throws Exception {
        this.setupSuitableTableStatistics();
        String query = "select * from fact, dim where fact.amount = dim.amount and dim.price = 500";
        this.util.verifyPlan(query);
    }

    private void createPartitionedFactTable(long rowCount) throws Exception {
        this.util.tableEnv().executeSql("CREATE TABLE fact_part (\n  id BIGINT,\n  name STRING,\n  amount BIGINT,\n  price BIGINT,\n  fact_date_sk BIGINT\n) PARTITIONED BY (fact_date_sk)\nWITH (\n 'connector' = 'values',\n 'runtime-source' = 'NewSource',\n 'partition-list' = 'fact_date_sk:1990;fact_date_sk:1991;fact_date_sk:1992',\n 'dynamic-filtering-fields' = 'fact_date_sk;amount',\n 'bounded' = 'true'\n)");
        CatalogPartitionSpec partSpec = new CatalogPartitionSpec(Collections.singletonMap("fact_date_sk", "666"));
        this.catalog.createPartition(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), "fact_part"), partSpec, (CatalogPartition)new CatalogPartitionImpl(new HashMap(), ""), true);
        this.catalog.alterPartitionStatistics(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), "fact_part"), partSpec, new CatalogTableStatistics(rowCount, 10, 1000L, 2000L), true);
    }

    private void setupSuitableTableStatistics() throws Exception {
        this.setupTableRowCount("dim", 131072L);
        this.setupTableRowCount("fact", 0x40000000L);
    }

    private void setupTableRowCount(String tableName, long rowCount) throws Exception {
        this.catalog.alterTableStatistics(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), tableName), new CatalogTableStatistics(rowCount, 1, 1L, 1L), false);
    }

    private void setupTableColumnNdv(String tableName, String columnName, long ndv) throws Exception {
        CatalogColumnStatisticsDataLong longColStats = new CatalogColumnStatisticsDataLong(Long.valueOf(-123L), Long.valueOf(763322L), Long.valueOf(ndv), Long.valueOf(79L));
        this.catalog.alterTableColumnStatistics(new ObjectPath(this.util.getTableEnv().getCurrentDatabase(), tableName), new CatalogColumnStatistics(Collections.singletonMap(columnName, longColStats)), false);
    }
}

