/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.python;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.client.python.PythonEnvUtils;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.util.FileUtils;

public class PythonFunctionFactoryTest {
    private static String tmpdir = "";
    private static StreamTableEnvironment tableEnv;
    private static Table sourceTable;

    public static void main(String[] args) throws Exception {
        PythonFunctionFactoryTest.prepareEnvironment();
        PythonFunctionFactoryTest.testPythonFunctionFactory();
        PythonFunctionFactoryTest.cleanEnvironment();
    }

    public static void prepareEnvironment() throws Exception {
        tmpdir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString()).getAbsolutePath();
        new File(tmpdir).mkdir();
        File pyFilePath = new File(tmpdir, "test1.py");
        try (FileOutputStream out = new FileOutputStream(pyFilePath);){
            String code = "from pyflink.table.udf import udf\nfrom pyflink.table import DataTypes\n@udf(input_types=DataTypes.STRING(), result_type=DataTypes.STRING())\ndef func1(str):\n    return str + str\n";
            ((OutputStream)out).write(code.getBytes());
        }
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        tableEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)sEnv);
        tableEnv.getConfig().set(PythonOptions.PYTHON_FILES, (Object)pyFilePath.getAbsolutePath());
        tableEnv.getConfig().set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, (Object)MemorySize.parse((String)"80mb"));
        sourceTable = tableEnv.fromDataStream((DataStream)sEnv.fromData((Object[])new String[]{"1", "2", "3"})).as("str", new String[0]);
    }

    public static void cleanEnvironment() throws Exception {
        PythonFunctionFactoryTest.closeStartedPythonProcess();
        FileUtils.deleteDirectory((File)new File(tmpdir));
    }

    public static void testPythonFunctionFactory() {
        tableEnv.executeSql("create function func1 as 'test1.func1' language python");
        PythonFunctionFactoryTest.verifyPlan(sourceTable.select(new Expression[]{Expressions.call((String)"func1", (Object[])new Object[]{Expressions.$((String)"str")})}), (TableEnvironment)tableEnv);
        tableEnv.executeSql("alter function func1 as 'test1.func1' language python");
        PythonFunctionFactoryTest.verifyPlan(sourceTable.select(new Expression[]{Expressions.call((String)"func1", (Object[])new Object[]{Expressions.$((String)"str")})}), (TableEnvironment)tableEnv);
        tableEnv.executeSql("create temporary function func1 as 'test1.func1' language python");
        PythonFunctionFactoryTest.verifyPlan(sourceTable.select(new Expression[]{Expressions.call((String)"func1", (Object[])new Object[]{Expressions.$((String)"str")})}), (TableEnvironment)tableEnv);
        tableEnv.executeSql("create temporary system function func1 as 'test1.func1' language python");
        PythonFunctionFactoryTest.verifyPlan(sourceTable.select(new Expression[]{Expressions.call((String)"func1", (Object[])new Object[]{Expressions.$((String)"str")})}), (TableEnvironment)tableEnv);
    }

    private static void verifyPlan(Table table, TableEnvironment tableEnvironment) {
        String expected;
        String plan = table.explain(new ExplainDetail[0]);
        if (!plan.contains(expected = "PythonCalc(select=[func1(f0) AS _c0])")) {
            throw new AssertionError((Object)String.format("This plan does not contains \"%s\":\n%s", expected, plan));
        }
    }

    private static void closeStartedPythonProcess() throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException {
        Class<?> clazz = Class.forName("java.lang.ApplicationShutdownHooks");
        Field field = clazz.getDeclaredField("hooks");
        field.setAccessible(true);
        Map hooks = (Map)field.get(null);
        PythonEnvUtils.PythonProcessShutdownHook shutdownHook = null;
        for (Thread t : hooks.keySet()) {
            if (!(t instanceof PythonEnvUtils.PythonProcessShutdownHook)) continue;
            shutdownHook = (PythonEnvUtils.PythonProcessShutdownHook)t;
            break;
        }
        if (shutdownHook != null) {
            shutdownHook.run();
            hooks.remove(shutdownHook);
        }
    }
}

