/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.codegen;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.planner.calcite.SqlToRexConverter;
import org.apache.flink.table.planner.codegen.AsyncCodeGenerator;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.PlannerMocks;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.generated.GeneratedFunction;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class AsyncCodeGeneratorTest {
    private static final RowType INPUT_TYPE = RowType.of((LogicalType[])new LogicalType[]{new IntType(), new BigIntType(), new VarCharType()});
    private PlannerMocks plannerMocks;
    private SqlToRexConverter converter;
    private RelDataType tableRowType;

    @BeforeEach
    public void before() {
        this.plannerMocks = PlannerMocks.create();
        this.tableRowType = this.plannerMocks.getPlannerContext().getTypeFactory().buildRelNodeRowType(JavaScalaConversionUtil.toScala(Arrays.asList("f1", "f2", "f3")), JavaScalaConversionUtil.toScala(Arrays.asList(new IntType(), new BigIntType(), new VarCharType())));
        ShortcutUtils.unwrapContext((RelOptCluster)this.plannerMocks.getPlanner().createToRelContext().getCluster());
        this.converter = ShortcutUtils.unwrapContext((RelOptCluster)this.plannerMocks.getPlanner().createToRelContext().getCluster()).getRexFactory().createSqlToRexConverter(this.tableRowType, null);
        this.plannerMocks.getFunctionCatalog().registerTemporarySystemFunction("myfunc", (FunctionDefinition)new AsyncFunc(), false);
        this.plannerMocks.getFunctionCatalog().registerTemporarySystemFunction("myfunc_error", (FunctionDefinition)new AsyncFuncError(), false);
    }

    @Test
    public void testStringReturnType() throws Exception {
        RowData rowData = this.execute("myFunc(f1, f2, f3)", RowType.of((LogicalType[])new LogicalType[]{new VarCharType()}), (RowData)GenericRowData.of((Object[])new Object[]{2, 3L, StringData.fromString((String)"foo")}));
        Assertions.assertThat((Object)rowData).isEqualTo((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"complete foo 4 6")}));
    }

    @Test
    public void testTwoReturnTypes_passThroughFirst() throws Exception {
        RowData rowData = this.execute(Arrays.asList("f2", "myFunc(f1, f2, f3)"), RowType.of((LogicalType[])new LogicalType[]{new VarCharType(), new BigIntType()}), (RowData)GenericRowData.of((Object[])new Object[]{2, 3L, StringData.fromString((String)"foo")}));
        Assertions.assertThat((Object)rowData).isEqualTo((Object)GenericRowData.of((Object[])new Object[]{3L, StringData.fromString((String)"complete foo 4 6")}));
    }

    @Test
    public void testTwoReturnTypes_passThroughSecond() throws Exception {
        RowData rowData = this.execute(Arrays.asList("myFunc(f1, f2, f3)", "f2"), RowType.of((LogicalType[])new LogicalType[]{new VarCharType(), new BigIntType()}), (RowData)GenericRowData.of((Object[])new Object[]{2, 3L, StringData.fromString((String)"foo")}));
        Assertions.assertThat((Object)rowData).isEqualTo((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"complete foo 4 6"), 3L}));
    }

    @Test
    public void testError() throws Exception {
        CompletableFuture<Collection<RowData>> future = this.executeFuture(Arrays.asList("myFunc_error(f1, f2, f3)"), RowType.of((LogicalType[])new LogicalType[]{new VarCharType(), new BigIntType()}), (RowData)GenericRowData.of((Object[])new Object[]{2, 3L, StringData.fromString((String)"foo")}));
        Assertions.assertThat(future).isCompletedExceptionally();
        Assertions.assertThatThrownBy(future::get).cause().hasMessage("Error!");
    }

    private RowData execute(String sqlExpression, RowType resultType, RowData input) throws Exception {
        return this.execute(Arrays.asList(sqlExpression), resultType, input);
    }

    private RowData execute(List<String> sqlExpressions, RowType resultType, RowData input) throws Exception {
        Collection<RowData> result = this.executeFuture(sqlExpressions, resultType, input).get();
        Assertions.assertThat(result).hasSize(1);
        return result.iterator().next();
    }

    private CompletableFuture<Collection<RowData>> executeFuture(List<String> sqlExpressions, RowType resultType, RowData input) throws Exception {
        List nodes = sqlExpressions.stream().map(sql -> this.converter.convertToRexNode(sql)).collect(Collectors.toList());
        GeneratedFunction function = AsyncCodeGenerator.generateFunction((String)"name", (RowType)INPUT_TYPE, (RowType)resultType, nodes, (boolean)true, (ReadableConfig)new Configuration(), (ClassLoader)Thread.currentThread().getContextClassLoader());
        AsyncFunction asyncFunction = (AsyncFunction)function.newInstance(Thread.currentThread().getContextClassLoader());
        TestResultFuture resultFuture = new TestResultFuture();
        asyncFunction.asyncInvoke((Object)input, (ResultFuture)resultFuture);
        return resultFuture.getResult();
    }

    public static final class AsyncFunc
    extends AsyncScalarFunction {
        public void eval(CompletableFuture<String> f, Integer i, Long l, String s) {
            f.complete("complete " + s + " " + i * i + " " + 2L * l);
        }
    }

    public static final class AsyncFuncError
    extends AsyncScalarFunction {
        public void eval(CompletableFuture<String> f, Integer i, Long l, String s) {
            f.completeExceptionally(new RuntimeException("Error!"));
        }
    }

    public static final class TestResultFuture
    implements ResultFuture<RowData> {
        CompletableFuture<Collection<RowData>> data = new CompletableFuture();

        public void complete(Collection<RowData> result) {
            this.data.complete(result);
        }

        public void completeExceptionally(Throwable error) {
            this.data.completeExceptionally(error);
        }

        public CompletableFuture<Collection<RowData>> getResult() {
            return this.data;
        }
    }
}

