/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.rest;

import java.io.File;
import java.net.URL;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders;
import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
import org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody;
import org.apache.flink.table.gateway.rest.message.application.DeployScriptResponseBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
import org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import org.apache.flink.table.gateway.service.utils.MockHttpServer;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.runtime.application.SqlDriver;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class DeployScriptITCase {
    @RegisterExtension
    @Order(value=1)
    public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(Configuration::new);
    @RegisterExtension
    @Order(value=2)
    private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
    private static TestingRestClient restClient;
    private static SessionHandle sessionHandle;
    private static final String script = "CREATE TEMPORARY TABLE sink(\n  a INT\n) WITH (\n  'connector' = 'blackhole'\n);\nINSERT INTO sink VALUES (1);";

    @BeforeAll
    static void beforeAll() throws Exception {
        restClient = TestingRestClient.getTestingRestClient();
        sessionHandle = new SessionHandle(UUID.fromString(((OpenSessionResponseBody)restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), (MessageHeaders)OpenSessionHeaders.getInstance(), (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)new OpenSessionRequestBody("test", Collections.singletonMap("key", "value"))).get()).getSessionHandle()));
    }

    @Test
    void testDeployScriptToYarnCluster(@TempDir Path workDir) throws Exception {
        this.verifyDeployScriptToCluster("yarn-application", script, null, script);
        try (MockHttpServer server = MockHttpServer.startHttpServer();){
            File file = workDir.resolve("script.sql").toFile();
            Assertions.assertThat((boolean)file.createNewFile()).isTrue();
            FileUtils.writeFileUtf8((File)file, (String)script);
            URL url = server.prepareResource("/download/script.sql", file);
            this.verifyDeployScriptToCluster("yarn-application", null, url.toString(), script);
        }
    }

    @Test
    void testDeployScriptToKubernetesCluster(@TempDir Path workDir) throws Exception {
        this.verifyDeployScriptToCluster("kubernetes-application", script, null, script);
        try (MockHttpServer server = MockHttpServer.startHttpServer();){
            File file = workDir.resolve("script.sql").toFile();
            Assertions.assertThat((boolean)file.createNewFile()).isTrue();
            FileUtils.writeFileUtf8((File)file, (String)script);
            URL url = server.prepareResource("/download/script.sql", file);
            this.verifyDeployScriptToCluster("kubernetes-application", null, url.toString(), script);
        }
    }

    private void verifyDeployScriptToCluster(String target, @Nullable String script, @Nullable String scriptUri, String content) throws Exception {
        TestApplicationClusterClientFactory.id = target;
        Assertions.assertThat((String)((DeployScriptResponseBody)restClient.sendRequest(SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), (MessageHeaders)DeployScriptHeaders.getInstance(), (MessageParameters)new SessionMessageParameters(sessionHandle), (RequestBody)new DeployScriptRequestBody(script, scriptUri, Collections.singletonMap(DeploymentOptions.TARGET.key(), target))).get()).getClusterID()).isEqualTo("test");
        ApplicationConfiguration config = TestApplicationClusterDescriptor.applicationConfiguration;
        Assertions.assertThat((String)TestApplicationClusterClientFactory.configuration.getString("key", "none")).isEqualTo("value");
        Assertions.assertThat((String)config.getApplicationClassName()).isEqualTo(SqlDriver.class.getName());
        Assertions.assertThat((String)SqlDriver.parseOptions((String[])config.getProgramArguments())).isEqualTo(content);
    }

    public static class TestApplicationClusterClientFactory<ClusterID>
    implements ClusterClientFactory {
        public static String id;
        private static volatile Configuration configuration;

        public boolean isCompatibleWith(Configuration configuration) {
            return Objects.equals(id, configuration.get(DeploymentOptions.TARGET));
        }

        public ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration) {
            TestApplicationClusterClientFactory.configuration = configuration;
            return TestApplicationClusterDescriptor.INSTANCE;
        }

        @Nullable
        public String getClusterId(Configuration configuration) {
            return "test-application";
        }

        public ClusterSpecification getClusterSpecification(Configuration configuration) {
            return new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();
        }
    }

    private static class TestApplicationClusterDescriptor<T>
    implements ClusterDescriptor<T> {
        private static final TestApplicationClusterDescriptor INSTANCE = new TestApplicationClusterDescriptor();
        static volatile ApplicationConfiguration applicationConfiguration;

        private TestApplicationClusterDescriptor() {
        }

        public String getClusterDescription() {
            return "Test Application Cluster Descriptor";
        }

        public ClusterClientProvider<T> retrieve(T clusterId) {
            throw new UnsupportedOperationException();
        }

        public ClusterClientProvider<T> deploySessionCluster(ClusterSpecification clusterSpecification) {
            throw new UnsupportedOperationException();
        }

        public ClusterClientProvider<T> deployApplicationCluster(ClusterSpecification clusterSpecification, ApplicationConfiguration applicationConfiguration) {
            TestApplicationClusterDescriptor.applicationConfiguration = applicationConfiguration;
            return new ClusterClientProvider<T>(){

                public ClusterClient<T> getClusterClient() {
                    return new TestClusterClient();
                }
            };
        }

        public void killCluster(T clusterId) {
            throw new UnsupportedOperationException();
        }

        public void close() {
        }
    }

    private static class TestClusterClient
    implements ClusterClient {
        private TestClusterClient() {
        }

        public void close() {
        }

        public Object getClusterId() {
            return "test";
        }

        public Configuration getFlinkConfiguration() {
            throw new UnsupportedOperationException();
        }

        public void shutDownCluster() {
            throw new UnsupportedOperationException();
        }

        public String getWebInterfaceURL() {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<JobID> submitJob(ExecutionPlan executionPlan) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Map<String, Object>> getAccumulators(JobID jobID, ClassLoader loader) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Acknowledge> cancel(JobID jobId) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<String> cancelWithSavepoint(JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<String> stopWithSavepoint(JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirectory, SavepointFormatType formatType) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<String> stopWithDetachedSavepoint(JobID jobId, boolean advanceToEndOfEventTime, @Nullable String savepointDirectory, SavepointFormatType formatType) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<Long> triggerCheckpoint(JobID jobId, CheckpointType checkpointType) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<String> triggerDetachedSavepoint(JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
            throw new UnsupportedOperationException();
        }

        public CompletableFuture<CoordinationResponse> sendCoordinationRequest(JobID jobId, String operatorUid, CoordinationRequest request) {
            throw new UnsupportedOperationException();
        }
    }
}

