/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service.application;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
import org.apache.flink.client.deployment.application.ApplicationDispatcherLeaderProcessFactoryFactory;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.JobRestEndpointFactory;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.table.gateway.service.utils.MockHttpServer;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.runtime.application.SqlDriver;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class ScriptRunnerITCase {
    private static Map<String, String> originalEnv;
    private static File udfJar;
    private OutputStream outputStream;

    ScriptRunnerITCase() {
    }

    @BeforeAll
    static void beforeAll(@TempDir File flinkHome, @TempDir Path functionHome) throws Exception {
        originalEnv = System.getenv();
        File confYaml = new File(flinkHome, "config.yaml");
        if (!confYaml.createNewFile()) {
            throw new IOException("Can't create testing config.yaml file.");
        }
        HashMap<String, String> map = new HashMap<String, String>(System.getenv());
        map.put("FLINK_CONF_DIR", flinkHome.getAbsolutePath());
        org.apache.flink.core.testutils.CommonTestUtils.setEnv(map);
        HashMap<String, String> classNameCodes = new HashMap<String, String>();
        classNameCodes.put("LowerUDF", String.format("public class %s extends org.apache.flink.table.functions.ScalarFunction {\n  public String eval(String str) {\n    return str.toLowerCase();\n  }\n}\n", "LowerUDF"));
        classNameCodes.put("UpperUDF", String.format("public class %s extends org.apache.flink.table.functions.ScalarFunction {\n  public String eval(String str) {\n    return str.toUpperCase();\n  }\n}\n", "UpperUDF"));
        udfJar = UserClassLoaderJarTestUtils.createJarFile((File)Files.createTempDirectory(functionHome, "test-jar", new FileAttribute[0]).toFile(), (String)"test-classloader-udf.jar", classNameCodes);
    }

    @BeforeEach
    void beforeEach() {
        this.outputStream = new ByteArrayOutputStream(1024);
        SqlDriver.enableTestMode((OutputStream)this.outputStream);
    }

    @AfterEach
    void afterEach() throws Exception {
        this.outputStream.close();
        SqlDriver.disableTestMode();
    }

    @AfterAll
    static void afterAll() {
        org.apache.flink.core.testutils.CommonTestUtils.setEnv(originalEnv);
    }

    @Test
    void testRunScriptFromFile(@TempDir Path workDir) throws Exception {
        String script = String.format("CREATE TEMPORARY TABLE sink(\n  a STRING\n) WITH (\n  'connector' = 'values'\n);\nADD JAR '%s';\nCREATE TEMPORARY FUNCTION lower_func AS '%s';\nCREATE TEMPORARY VIEW v(c) AS VALUES ('A'), ('B'), ('C');\nINSERT INTO sink SELECT lower_func(c) FROM v;", udfJar.getAbsolutePath(), "LowerUDF");
        List<String> arguments = Arrays.asList("--scriptUri", ScriptRunnerITCase.createStatementFile(workDir, script).toString());
        this.runScriptInCluster(arguments);
        Assertions.assertThat((List)TestValuesTableFactory.getResultsAsStrings((String)"sink")).containsExactly((Object[])new String[]{"+I[a]", "+I[b]", "+I[c]"});
    }

    @Test
    void testRunScriptFromRemoteFile(@TempDir Path workDir) throws Exception {
        String script = String.format("CREATE TEMPORARY TABLE sink(\n  a STRING\n) WITH (\n  'connector' = 'values'\n);\nADD JAR '%s';\nCREATE TEMPORARY FUNCTION lower_func AS '%s';\nCREATE TEMPORARY VIEW v(c) AS VALUES ('A'), ('B'), ('C');\nINSERT INTO sink SELECT lower_func(c) FROM v;", udfJar.getAbsolutePath(), "LowerUDF");
        File file = ScriptRunnerITCase.createStatementFile(workDir, script).toFile();
        try (MockHttpServer server = MockHttpServer.startHttpServer();){
            URL url = server.prepareResource("/download/script.sql", file);
            List<String> arguments = Arrays.asList("--scriptUri", url.toString());
            this.runScriptInCluster(arguments);
        }
    }

    @Test
    void testRunScript() throws Exception {
        List<String> arguments = Arrays.asList("--script", String.format("CREATE TEMPORARY TABLE sink(\n  a STRING\n) WITH (\n  'connector' = 'values'\n);\nCREATE TEMPORARY FUNCTION upper_func AS '%s' USING JAR '%s';\nCREATE TEMPORARY VIEW v(c) AS VALUES ('a'), ('b'), ('c');\nINSERT INTO sink SELECT upper_func(c) FROM v;", "UpperUDF", udfJar.getAbsolutePath()));
        this.runScriptInCluster(arguments);
        Assertions.assertThat((List)TestValuesTableFactory.getResultsAsStrings((String)"sink")).containsExactly((Object[])new String[]{"+I[A]", "+I[B]", "+I[C]"});
    }

    void runScriptInCluster(List<String> arguments) throws Exception {
        JobID jobID = JobID.generate();
        Configuration configuration = new Configuration();
        configuration.set(DeploymentOptions.TARGET, (Object)"embedded");
        configuration.set(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH, (Object)false);
        configuration.set(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, (Object)true);
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, (Object)jobID.toHexString());
        TestingMiniClusterConfiguration clusterConfiguration = TestingMiniClusterConfiguration.newBuilder().setConfiguration(configuration).build();
        PackagedProgram.Builder builder = PackagedProgram.newBuilder().setEntryPointClassName(SqlDriver.class.getName()).setArguments(arguments.toArray(new String[0])).setUserClassPaths(ExecutionConfigAccessor.fromConfiguration((Configuration)configuration).getClasspaths());
        TestingMiniCluster.Builder clusterBuilder = TestingMiniCluster.newBuilder((TestingMiniClusterConfiguration)clusterConfiguration).setDispatcherResourceManagerComponentFactorySupplier(ScriptRunnerITCase.createApplicationModeDispatcherResourceManagerComponentFactorySupplier((Configuration)clusterConfiguration.getConfiguration(), builder.build()));
        try (TestingMiniCluster cluster = clusterBuilder.build();){
            cluster.start();
            ScriptRunnerITCase.awaitJobStatus((MiniCluster)cluster, jobID, JobStatus.FINISHED);
        }
    }

    private static Path createStatementFile(Path workDir, String script) throws Exception {
        File file = new File(workDir.toString(), "statement.sql");
        Assertions.assertThat((boolean)file.createNewFile()).isTrue();
        FileUtils.writeFileUtf8((File)file, (String)script);
        return file.toPath();
    }

    private static Supplier<DispatcherResourceManagerComponentFactory> createApplicationModeDispatcherResourceManagerComponentFactorySupplier(Configuration configuration, PackagedProgram program) {
        return () -> {
            ApplicationDispatcherLeaderProcessFactoryFactory applicationDispatcherLeaderProcessFactoryFactory = ApplicationDispatcherLeaderProcessFactoryFactory.create((Configuration)new Configuration(configuration), (DispatcherFactory)SessionDispatcherFactory.INSTANCE, (PackagedProgram)program);
            return new DefaultDispatcherResourceManagerComponentFactory((DispatcherRunnerFactory)new DefaultDispatcherRunnerFactory((DispatcherLeaderProcessFactoryFactory)applicationDispatcherLeaderProcessFactoryFactory), (ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance(), (RestEndpointFactory)JobRestEndpointFactory.INSTANCE);
        };
    }

    private static void awaitJobStatus(MiniCluster cluster, JobID jobId, JobStatus status) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            try {
                return cluster.getJobStatus(jobId).get() == status;
            }
            catch (ExecutionException e) {
                if (ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent()) {
                    return false;
                }
                throw e;
            }
        }, (long)500L, (int)60);
    }
}

