/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql;

import java.util.EnumSet;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.ArgumentTrait;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;
import org.apache.flink.table.types.inference.StaticArgument;
import org.apache.flink.table.types.inference.StaticArgumentTrait;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class ProcessTableFunctionTest
extends TableTestBase {
    private TableTestUtil util;

    @BeforeEach
    void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.util.tableEnv().executeSql("CREATE VIEW t1 AS SELECT 'Bob' AS name, 12 AS score");
        this.util.tableEnv().executeSql("CREATE VIEW t2 AS SELECT 'Bob' AS name, 12 AS different");
        this.util.tableEnv().executeSql("CREATE VIEW t3 AS SELECT 'Bob' AS name, TRUE AS isValid");
    }

    @Test
    void testScalarArgsNoUid() {
        this.util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(i => 1, b => true)");
    }

    @Test
    void testScalarArgsWithUid() {
        this.util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(uid => 'my-uid', i => 1, b => true)");
    }

    @Test
    void testUnknownScalarArg() {
        this.util.addTemporarySystemFunction("f", ScalarArgsFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(i => 1, b => true, invalid => 'invalid')");
    }

    @Test
    void testTableAsRow() {
        this.util.addTemporarySystemFunction("f", TableAsRowFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
    }

    @Test
    void testTypedTableAsRow() {
        this.util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(u => TABLE t1, i => 1)");
    }

    @Test
    void testTypedTableAsRowIgnoringColumnNames() {
        this.util.addTemporarySystemFunction("f", TypedTableAsRowFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(u => TABLE t2, i => 1)");
    }

    @Test
    void testTableAsSet() {
        this.util.addTemporarySystemFunction("f", TableAsSetFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)");
    }

    @Test
    void testTableAsSetOptionalPartitionBy() {
        this.util.addTemporarySystemFunction("f", TableAsSetOptionalPartitionFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
    }

    @Test
    void testTypedTableAsSet() {
        this.util.addTemporarySystemFunction("f", TypedTableAsSetFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(u => TABLE t1 PARTITION BY name, i => 1)");
    }

    @Test
    void testEmptyArgs() {
        this.util.addTemporarySystemFunction("f", EmptyArgFunction.class);
        this.util.verifyRelPlan("SELECT * FROM f(uid => 'my-ptf')");
    }

    @Test
    void testPojoArgs() {
        this.util.addTemporarySystemFunction("f", PojoArgsFunction.class);
        this.util.addTemporarySystemFunction("pojoCreator", PojoCreatingFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(input => TABLE t1, scalar => pojoCreator('Bob', 12), uid => 'my-ptf')");
    }

    @Test
    void testTableAsSetPassThroughColumns() {
        this.util.addTemporarySystemFunction("f", TableAsSetPassThroughFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)");
    }

    @Test
    void testTableAsRowPassThroughColumns() {
        this.util.addTemporarySystemFunction("f", TableAsRowPassThroughFunction.class);
        this.assertReachesOptimizer("SELECT * FROM f(r => TABLE t1, i => 1)");
    }

    @ParameterizedTest
    @MethodSource(value={"errorSpecs"})
    void testErrorBehavior(ErrorSpec spec) {
        this.util.addTemporarySystemFunction("f", spec.functionClass);
        Assertions.assertThatThrownBy(() -> this.util.verifyRelPlan(spec.sql)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)spec.errorMessage)});
    }

    private static Stream<ErrorSpec> errorSpecs() {
        return Stream.of(ErrorSpec.of("invalid uid", ScalarArgsFunction.class, "SELECT * FROM f(uid => '%', i => 1, b => true)", "Invalid unique identifier for process table function. The 'uid' argument must be a string literal that follows the pattern [a-zA-Z_][a-zA-Z-_0-9]*. But found: %"), ErrorSpec.of("typed table as row with invalid input", TypedTableAsRowFunction.class, "SELECT * FROM f(u => TABLE t3, i => 1)", "No match found for function signature f(<RecordType(CHAR(3) name, BOOLEAN isValid)>, <NUMERIC>, <CHARACTER>)"), ErrorSpec.of("table as set with missing partition by", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t1, i => 1)", "Table argument 'r' requires a PARTITION BY clause for parallel processing."), ErrorSpec.of("typed table as set with invalid input", TypedTableAsSetFunction.class, "SELECT * FROM f(u => TABLE t3 PARTITION BY name, i => 1)", "No match found for function signature f(<RecordType(CHAR(3) name, BOOLEAN isValid)>, <NUMERIC>, <CHARACTER>)"), ErrorSpec.of("table function instead of process table function", NoProcessTableFunction.class, "SELECT * FROM f(r => TABLE t1)", "Only scalar arguments are supported at this location. But argument 'r' declared the following traits: [TABLE, TABLE_AS_ROW]"), ErrorSpec.of("reserved args", ReservedArgFunction.class, "SELECT * FROM f(uid => 'my-ptf')", "Function signature must not declare system arguments. Reserved argument names are: [uid]"), ErrorSpec.of("multiple table args", MultiTableFunction.class, "SELECT * FROM f(r1 => TABLE t1, r2 => TABLE t1)", "Currently, only signatures with at most one table argument are supported."), ErrorSpec.of("row instead of table", TableAsRowFunction.class, "SELECT * FROM f(r => ROW(42), i => 1)", "Invalid argument value. Argument 'r' expects a table to be passed."), ErrorSpec.of("table as row partition by", TableAsRowFunction.class, "SELECT * FROM f(r => TABLE t1 PARTITION BY name, i => 1)", "Only tables with set semantics may be partitioned. Invalid PARTITION BY clause in the 0-th operand of table function 'f'"), ErrorSpec.of("invalid partition by clause", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t1 PARTITION BY invalid, i => 1)", "Invalid column 'invalid' for PARTITION BY clause. Available columns are: [name, score]"), ErrorSpec.of("unsupported order by", TableAsSetFunction.class, "SELECT * FROM f(r => TABLE t1 PARTITION BY name ORDER BY score, i => 1)", "ORDER BY clause is currently not supported."));
    }

    private void assertReachesOptimizer(String sql) {
        Assertions.assertThatThrownBy(() -> this.util.verifyRelPlan(sql)).hasMessageContaining("This exception indicates that the query uses an unsupported SQL feature.");
    }

    public static class ScalarArgsFunction
    extends ProcessTableFunction<String> {
        public void eval(Integer i, Boolean b) {
        }
    }

    public static class TableAsRowFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_ROW}) Row r, Integer i) {
        }
    }

    public static class TypedTableAsRowFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_ROW}) User u, Integer i) {
        }
    }

    public static class TableAsSetFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_SET}) Row r, Integer i) {
        }
    }

    public static class TableAsSetOptionalPartitionFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_SET, ArgumentTrait.OPTIONAL_PARTITION_BY}) Row r, Integer i) {
        }
    }

    public static class TypedTableAsSetFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_SET}) User u, Integer i) {
        }
    }

    public static class EmptyArgFunction
    extends ProcessTableFunction<String> {
        public void eval() {
        }
    }

    public static class PojoArgsFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_ROW}) User input, User scalar) {
        }
    }

    public static class PojoCreatingFunction
    extends ScalarFunction {
        public User eval(String s, Integer i) {
            return new User(s, i);
        }
    }

    public static class TableAsSetPassThroughFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_SET, ArgumentTrait.PASS_COLUMNS_THROUGH}) Row r, Integer i) {
        }
    }

    public static class TableAsRowPassThroughFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_ROW, ArgumentTrait.PASS_COLUMNS_THROUGH}) Row r, Integer i) {
        }
    }

    private static class ErrorSpec {
        private final String description;
        private final Class<? extends UserDefinedFunction> functionClass;
        private final String sql;
        private final String errorMessage;

        private ErrorSpec(String description, Class<? extends UserDefinedFunction> functionClass, String sql, String errorMessage) {
            this.description = description;
            this.functionClass = functionClass;
            this.sql = sql;
            this.errorMessage = errorMessage;
        }

        static ErrorSpec of(String description, Class<? extends UserDefinedFunction> functionClass, String sql, String errorMessage) {
            return new ErrorSpec(description, functionClass, sql, errorMessage);
        }

        public String toString() {
            return this.description;
        }
    }

    public static class NoProcessTableFunction
    extends TableFunction<String> {
        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
            return TypeInference.newBuilder().staticArguments(new StaticArgument[]{StaticArgument.table((String)"r", Row.class, (boolean)false, EnumSet.of(StaticArgumentTrait.TABLE_AS_ROW))}).outputTypeStrategy(callContext -> Optional.of(DataTypes.STRING())).build();
        }

        public void eval(Row r) {
        }
    }

    public static class ReservedArgFunction
    extends ProcessTableFunction<String> {
        public void eval(String uid) {
        }
    }

    public static class MultiTableFunction
    extends ProcessTableFunction<String> {
        public void eval(@ArgumentHint(value={ArgumentTrait.TABLE_AS_SET, ArgumentTrait.OPTIONAL_PARTITION_BY}) Row r1, @ArgumentHint(value={ArgumentTrait.TABLE_AS_SET, ArgumentTrait.OPTIONAL_PARTITION_BY}) Row r2) {
        }
    }

    public static class User {
        public String s;
        public Integer i;

        public User(String s, Integer i) {
            this.s = s;
            this.i = i;
        }
    }
}

