/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.runtime.rest.ConnectionClosedException;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
import org.apache.flink.runtime.rest.messages.ConversionException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameter;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.MessagePathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.TestRestHandler;
import org.apache.flink.runtime.rest.util.TestRestServerEndpoint;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.TooLongFrameException;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class RestServerEndpointITCase {
    private static final JobID PATH_JOB_ID = new JobID();
    private static final JobID QUERY_JOB_ID = new JobID();
    private static final String JOB_ID_KEY = "jobid";
    private static final Duration timeout = Duration.ofSeconds(10L);
    private static final int TEST_REST_MAX_CONTENT_LENGTH = 4096;
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    @TempDir
    Path tempFolder;
    private RestServerEndpoint serverEndpoint;
    private RestClient restClient;
    private TestUploadHandler testUploadHandler;
    private InetSocketAddress serverAddress;
    private final Configuration config;
    private SSLContext defaultSSLContext;
    private SSLSocketFactory defaultSSLSocketFactory;
    private TestHandler testHandler;

    public RestServerEndpointITCase(Configuration config) {
        this.config = Objects.requireNonNull(config);
    }

    @Parameters
    public static Collection<Object[]> data() throws Exception {
        Configuration config = RestServerEndpointITCase.getBaseConfig();
        String truststorePath = RestServerEndpointITCase.getTestResource("local127.truststore").getAbsolutePath();
        String keystorePath = RestServerEndpointITCase.getTestResource("local127.keystore").getAbsolutePath();
        Configuration sslConfig = new Configuration(config);
        sslConfig.set(SecurityOptions.SSL_REST_ENABLED, (Object)true);
        sslConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE, (Object)truststorePath);
        sslConfig.set(SecurityOptions.SSL_REST_TRUSTSTORE_PASSWORD, (Object)"password");
        sslConfig.set(SecurityOptions.SSL_REST_KEYSTORE, (Object)keystorePath);
        sslConfig.set(SecurityOptions.SSL_REST_KEYSTORE_PASSWORD, (Object)"password");
        sslConfig.set(SecurityOptions.SSL_REST_KEY_PASSWORD, (Object)"password");
        Configuration sslRestAuthConfig = new Configuration(sslConfig);
        sslRestAuthConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, (Object)true);
        Configuration sslPinningRestAuthConfig = new Configuration(sslRestAuthConfig);
        sslPinningRestAuthConfig.set(SecurityOptions.SSL_REST_CERT_FINGERPRINT, (Object)SSLUtilsTest.getRestCertificateFingerprint(sslPinningRestAuthConfig, "flink.test"));
        return Arrays.asList({config}, {sslConfig}, {sslRestAuthConfig}, {sslPinningRestAuthConfig});
    }

    private static Configuration getBaseConfig() {
        String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
        Configuration config = new Configuration();
        config.set(RestOptions.BIND_PORT, (Object)"0");
        config.set(RestOptions.BIND_ADDRESS, (Object)loopbackAddress);
        config.set(RestOptions.ADDRESS, (Object)loopbackAddress);
        config.set(RestOptions.SERVER_MAX_CONTENT_LENGTH, (Object)4096);
        config.set(RestOptions.CLIENT_MAX_CONTENT_LENGTH, (Object)4096);
        return config;
    }

    @BeforeEach
    void setup() throws Exception {
        this.config.set(WebOptions.UPLOAD_DIR, (Object)this.tempFolder.toUri().getPath());
        this.defaultSSLContext = SSLContext.getDefault();
        this.defaultSSLSocketFactory = HttpsURLConnection.getDefaultSSLSocketFactory();
        SSLContext sslClientContext = SSLUtils.createRestSSLContext((Configuration)this.config, (boolean)true);
        if (sslClientContext != null) {
            SSLContext.setDefault(sslClientContext);
            HttpsURLConnection.setDefaultSSLSocketFactory(sslClientContext.getSocketFactory());
        }
        TestingRestfulGateway mockRestfulGateway = new TestingRestfulGateway.Builder().build();
        GatewayRetriever mockGatewayRetriever = () -> CompletableFuture.completedFuture(mockRestfulGateway);
        this.testHandler = new TestHandler((GatewayRetriever<RestfulGateway>)mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        TestVersionHandler testVersionHandler = new TestVersionHandler((GatewayRetriever<? extends RestfulGateway>)mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        TestRestHandler testVersionSelectionHandler1 = new TestRestHandler(mockGatewayRetriever, TestVersionSelectionHeaders1.INSTANCE, FutureUtils.completedExceptionally((Throwable)new RestHandlerException("test failure 1", HttpResponseStatus.OK)));
        TestRestHandler testVersionSelectionHandler2 = new TestRestHandler(mockGatewayRetriever, TestVersionSelectionHeaders2.INSTANCE, FutureUtils.completedExceptionally((Throwable)new RestHandlerException("test failure 2", HttpResponseStatus.ACCEPTED)));
        this.testUploadHandler = new TestUploadHandler((GatewayRetriever<? extends RestfulGateway>)mockGatewayRetriever, RpcUtils.INF_TIMEOUT);
        StaticFileServerHandler staticFileServerHandler = new StaticFileServerHandler(mockGatewayRetriever, RpcUtils.INF_TIMEOUT, this.tempFolder.toFile());
        this.serverEndpoint = TestRestServerEndpoint.builder(this.config).withHandler((RestHandlerSpecification)new TestHeaders(), (ChannelInboundHandler)this.testHandler).withHandler((RestHandlerSpecification)TestUploadHeaders.INSTANCE, (ChannelInboundHandler)this.testUploadHandler).withHandler(testVersionHandler).withHandler(testVersionSelectionHandler1).withHandler(testVersionSelectionHandler2).withHandler((RestHandlerSpecification)WebContentHandlerSpecification.getInstance(), (ChannelInboundHandler)staticFileServerHandler).withHandler(new TestUnavailableHandler((GatewayRetriever<RestfulGateway>)mockGatewayRetriever)).buildAndStart();
        this.restClient = new RestClient(this.config, (Executor)EXECUTOR_EXTENSION.getExecutor());
        this.serverAddress = this.serverEndpoint.getServerAddress();
    }

    @AfterEach
    void teardown() throws Exception {
        if (this.defaultSSLContext != null) {
            SSLContext.setDefault(this.defaultSSLContext);
            HttpsURLConnection.setDefaultSSLSocketFactory(this.defaultSSLSocketFactory);
        }
        if (this.restClient != null) {
            this.restClient.shutdown(timeout);
            this.restClient = null;
        }
        if (this.serverEndpoint != null) {
            this.serverEndpoint.closeAsync().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            this.serverEndpoint = null;
        }
    }

    @TestTemplate
    void testRequestInterleaving() throws Exception {
        BlockerSync sync = new BlockerSync();
        this.testHandler.handlerBody = id -> {
            if (id == 1) {
                try {
                    sync.block();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return CompletableFuture.completedFuture(new TestResponse((int)id));
        };
        CompletableFuture<TestResponse> response1 = this.sendRequestToTestHandler(new TestRequest(1));
        sync.awaitBlocker();
        CompletableFuture<TestResponse> response2 = this.sendRequestToTestHandler(new TestRequest(2));
        Assertions.assertThat((int)response2.get().id).isEqualTo(2);
        sync.releaseBlocker();
        Assertions.assertThat((int)response1.get().id).isOne();
    }

    @TestTemplate
    void testBadHandlerRequest() throws Exception {
        FaultyTestParameters parameters = new FaultyTestParameters();
        parameters.faultyJobIDPathParameter.resolve(PATH_JOB_ID);
        parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
        CompletableFuture response = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)new TestHeaders(), (MessageParameters)parameters, (RequestBody)new TestRequest(2));
        FlinkAssertions.assertThatFuture((CompletableFuture)response).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(RestClientException.class).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestClientException)e.getCause()).getHttpResponseStatus()).isEqualTo((Object)HttpResponseStatus.BAD_REQUEST)});
    }

    @TestTemplate
    void testShouldRespectMaxContentLengthLimitForRequests() throws Exception {
        this.testHandler.handlerBody = id -> {
            throw new AssertionError((Object)"Request should not arrive at server.");
        };
        FlinkAssertions.assertThatFuture(this.sendRequestToTestHandler(new TestRequest(2, RestServerEndpointITCase.createStringOfSize(4096)))).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(RestClientException.class).withMessageContaining("Try to raise");
    }

    @TestTemplate
    void testShouldRespectMaxContentLengthLimitForResponses() throws Exception {
        this.testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse((int)id, RestServerEndpointITCase.createStringOfSize(4096)));
        FlinkAssertions.assertThatFuture(this.sendRequestToTestHandler(new TestRequest(1))).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(TooLongFrameException.class).withMessageContaining("Try to raise");
    }

    @TestTemplate
    void testFileUpload() throws Exception {
        String boundary = RestServerEndpointITCase.generateMultiPartBoundary();
        String uploadedContent = "hello";
        HttpURLConnection connection = this.openHttpConnectionForUpload(boundary, TestUploadHeaders.INSTANCE.getTargetRestEndpointURL());
        RestServerEndpointITCase.uploadFile(connection, "hello", boundary);
        Assertions.assertThat((int)connection.getResponseCode()).isEqualTo(200);
        byte[] lastUploadedFileContents = this.testUploadHandler.getLastUploadedFileContents();
        Assertions.assertThat((String)"hello").isEqualTo(new String(lastUploadedFileContents, StandardCharsets.UTF_8));
    }

    @TestTemplate
    void testFileUploadLimitedToAllowedUris() throws Exception {
        String boundary = RestServerEndpointITCase.generateMultiPartBoundary();
        File uploadDir = new File(this.tempFolder.toString(), "flink-web-upload");
        File[] preUploadFiles = uploadDir.listFiles();
        Assertions.assertThat((boolean)TestVersionHeaders.INSTANCE.acceptsFileUploads()).isFalse();
        String uri = TestVersionHeaders.INSTANCE.getTargetRestEndpointURL();
        HttpURLConnection connection = this.openHttpConnectionForUpload(boundary, uri);
        RestServerEndpointITCase.uploadFile(connection, "hello", boundary);
        Assertions.assertThat((int)connection.getResponseCode()).isEqualTo(400);
        Object[] postUploadFiles = uploadDir.listFiles();
        Assertions.assertThat((Object[])postUploadFiles).isEqualTo((Object)preUploadFiles);
    }

    @TestTemplate
    void testMultiPartFormDataWithoutFileUpload() throws Exception {
        String boundary = RestServerEndpointITCase.generateMultiPartBoundary();
        String crlf = "\r\n";
        HttpURLConnection connection = this.openHttpConnectionForUpload(boundary, TestUploadHeaders.INSTANCE.getTargetRestEndpointURL());
        try (OutputStream output = connection.getOutputStream();
             PrintWriter writer = new PrintWriter((Writer)new OutputStreamWriter(output, StandardCharsets.UTF_8), true);){
            writer.append("--" + boundary).append("\r\n");
            writer.append("Content-Disposition: form-data; name=\"foo\"").append("\r\n");
            writer.append("\r\n").flush();
            output.write("test".getBytes(StandardCharsets.UTF_8));
            output.flush();
            writer.append("\r\n").flush();
            writer.append("--" + boundary + "--").append("\r\n").flush();
        }
        Assertions.assertThat((int)connection.getResponseCode()).isEqualTo(400);
    }

    @TestTemplate
    void testStaticFileServerHandler() throws Exception {
        File file = TempDirUtils.newFile((Path)this.tempFolder);
        Files.write(file.toPath(), Collections.singletonList("foobar"), new OpenOption[0]);
        URL url = new URL(this.serverEndpoint.getRestBaseUrl() + "/" + file.getName());
        HttpURLConnection connection = (HttpURLConnection)url.openConnection();
        connection.setRequestMethod("GET");
        String fileContents = IOUtils.toString((InputStream)connection.getInputStream());
        Assertions.assertThat((String)fileContents.trim()).isEqualTo("foobar");
    }

    @TestTemplate
    void testVersioning() throws Exception {
        CompletableFuture unspecifiedVersionResponse = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionHeaders.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList());
        unspecifiedVersionResponse.get(5L, TimeUnit.SECONDS);
        CompletableFuture specifiedVersionResponse = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionHeaders.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), (RestAPIVersion)RuntimeRestAPIVersion.V1);
        specifiedVersionResponse.get(5L, TimeUnit.SECONDS);
    }

    @TestTemplate
    void testVersionSelection() throws Exception {
        CompletableFuture version1Response = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionSelectionHeaders1.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), (RestAPIVersion)RuntimeRestAPIVersion.V0);
        FlinkAssertions.assertThatFuture((CompletableFuture)version1Response).failsWithin(5L, TimeUnit.SECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RestClientException.class).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestClientException)e.getCause()).getHttpResponseStatus()).isEqualTo((Object)HttpResponseStatus.OK)});
        CompletableFuture version2Response = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestVersionSelectionHeaders2.INSTANCE, (MessageParameters)EmptyMessageParameters.getInstance(), (RequestBody)EmptyRequestBody.getInstance(), Collections.emptyList(), (RestAPIVersion)RuntimeRestAPIVersion.V1);
        FlinkAssertions.assertThatFuture((CompletableFuture)version2Response).failsWithin(5L, TimeUnit.SECONDS).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(RestClientException.class).satisfies(new ThrowingConsumer[]{e -> Assertions.assertThat((Comparable)((RestClientException)e.getCause()).getHttpResponseStatus()).isEqualTo((Object)HttpResponseStatus.ACCEPTED)});
    }

    @TestTemplate
    void testDefaultVersionRouting() throws Exception {
        ((AbstractBooleanAssert)Assumptions.assumeThat((Boolean)((Boolean)this.config.get(SecurityOptions.SSL_REST_ENABLED))).as("Ignoring SSL-enabled test to keep OkHttp usage simple.", new Object[0])).isFalse();
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder().url(this.serverEndpoint.getRestBaseUrl() + TestVersionSelectionHeaders2.INSTANCE.getTargetRestEndpointURL()).build();
        try (Response response = client.newCall(request).execute();){
            Assertions.assertThat((int)response.code()).isEqualTo(HttpResponseStatus.ACCEPTED.code());
        }
    }

    @TestTemplate
    void testNonSslRedirectForEnabledSsl() throws Exception {
        Assumptions.assumeThat((Boolean)((Boolean)this.config.get(SecurityOptions.SSL_REST_ENABLED))).isTrue();
        OkHttpClient client = new OkHttpClient.Builder().followRedirects(false).build();
        String httpsUrl = this.serverEndpoint.getRestBaseUrl() + "/path";
        String httpUrl = httpsUrl.replace("https://", "http://");
        Request request = new Request.Builder().url(httpUrl).build();
        try (Response response = client.newCall(request).execute();){
            Assertions.assertThat((int)response.code()).isEqualTo(HttpResponseStatus.MOVED_PERMANENTLY.code());
            Assertions.assertThat((Collection)response.headers().names()).contains((Object[])new String[]{"location"});
            Assertions.assertThat((String)response.header("location")).isEqualTo(httpsUrl);
        }
    }

    @TestTemplate
    void testShouldWaitForHandlersWhenClosing() throws Exception {
        this.testHandler.closeFuture = new CompletableFuture();
        BlockerSync sync = new BlockerSync();
        this.testHandler.handlerBody = id -> CompletableFuture.supplyAsync(() -> {
            try {
                sync.block();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return new TestResponse((int)id);
        });
        CompletableFuture closeRestServerEndpointFuture = this.serverEndpoint.closeAsync();
        Assertions.assertThat((CompletableFuture)closeRestServerEndpointFuture).isNotDone();
        CompletableFuture<TestResponse> request = this.sendRequestToTestHandler(new TestRequest(1));
        sync.awaitBlocker();
        this.testHandler.closeFuture.complete(null);
        Assertions.assertThat((CompletableFuture)closeRestServerEndpointFuture).isNotDone();
        sync.releaseBlocker();
        request.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        closeRestServerEndpointFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testRequestsRejectedAfterShutdownOfHandlerIsCompleted() throws Exception {
        this.testHandler.handlerBody = id -> CompletableFuture.completedFuture(new TestResponse((int)id, "foobar"));
        this.testUploadHandler.closeFuture = new CompletableFuture();
        CompletableFuture closeRestServerEndpointFuture = this.serverEndpoint.closeAsync();
        Assertions.assertThat((CompletableFuture)closeRestServerEndpointFuture).isNotDone();
        this.testHandler.closeLatch.await();
        CompletableFuture<TestResponse> request = this.sendRequestToTestHandler(new TestRequest(1));
        try {
            request.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
            Assertions.fail((String)"Expected a ConnectionClosedException");
        }
        catch (ExecutionException ee) {
            if (!ExceptionUtils.findThrowable((Throwable)ee, ConnectionClosedException.class).isPresent()) {
                throw ee;
            }
        }
        finally {
            this.testUploadHandler.closeFuture.complete(null);
            closeRestServerEndpointFuture.get();
        }
    }

    @TestTemplate
    void testRestServerBindPort() throws Exception {
        int portRangeStart = 52300;
        int portRangeEnd = 52400;
        Configuration config = new Configuration();
        config.set(RestOptions.ADDRESS, (Object)"localhost");
        config.set(RestOptions.BIND_PORT, (Object)"52300-52400");
        try (TestRestServerEndpoint serverEndpoint1 = TestRestServerEndpoint.builder(config).build();
             TestRestServerEndpoint serverEndpoint2 = TestRestServerEndpoint.builder(config).build();){
            serverEndpoint1.start();
            serverEndpoint2.start();
            Assertions.assertThat((int)serverEndpoint1.getServerAddress().getPort()).isNotEqualTo(serverEndpoint2.getServerAddress().getPort());
            Assertions.assertThat((int)serverEndpoint1.getServerAddress().getPort()).isGreaterThanOrEqualTo(52300);
            Assertions.assertThat((int)serverEndpoint1.getServerAddress().getPort()).isLessThanOrEqualTo(52400);
            Assertions.assertThat((int)serverEndpoint2.getServerAddress().getPort()).isGreaterThanOrEqualTo(52300);
            Assertions.assertThat((int)serverEndpoint2.getServerAddress().getPort()).isLessThanOrEqualTo(52400);
        }
    }

    @TestTemplate
    void testEndpointsMustBeUnique() throws Exception {
        CommonTestUtils.assertThrows((String)"REST handler registration", FlinkRuntimeException.class, () -> {
            try (TestRestServerEndpoint restServerEndpoint = TestRestServerEndpoint.builder(this.config).withHandler((RestHandlerSpecification)new TestHeaders(), (ChannelInboundHandler)this.testHandler).withHandler((RestHandlerSpecification)new TestHeaders(), (ChannelInboundHandler)this.testUploadHandler).build();){
                restServerEndpoint.start();
                Object var2_2 = null;
                return var2_2;
            }
        });
    }

    @TestTemplate
    void testDuplicateHandlerRegistrationIsForbidden() throws Exception {
        CommonTestUtils.assertThrows((String)"Duplicate REST handler", FlinkRuntimeException.class, () -> {
            try (TestRestServerEndpoint restServerEndpoint = TestRestServerEndpoint.builder(this.config).withHandler((RestHandlerSpecification)new TestHeaders(), (ChannelInboundHandler)this.testHandler).withHandler((RestHandlerSpecification)TestUploadHeaders.INSTANCE, (ChannelInboundHandler)this.testHandler).build();){
                restServerEndpoint.start();
                Object var2_2 = null;
                return var2_2;
            }
        });
    }

    @TestTemplate
    void testOnUnavailableRpcEndpointReturns503() throws IOException {
        CompletableFuture response = this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)TestUnavailableHeaders.INSTANCE);
        Assertions.assertThatThrownBy(response::get).extracting(x -> ExceptionUtils.findThrowable((Throwable)x, RestClientException.class)).extracting(Optional::get).extracting(RestClientException::getHttpResponseStatus).isEqualTo((Object)HttpResponseStatus.SERVICE_UNAVAILABLE);
    }

    private static File getTestResource(String fileName) {
        ClassLoader classLoader = ClassLoader.getSystemClassLoader();
        URL resource = classLoader.getResource(fileName);
        if (resource == null) {
            throw new IllegalArgumentException(String.format("Test resource %s does not exist", fileName));
        }
        return new File(resource.getFile());
    }

    private HttpURLConnection openHttpConnectionForUpload(String boundary, String uploadUri) throws IOException {
        HttpURLConnection connection = (HttpURLConnection)new URL(this.serverEndpoint.getRestBaseUrl() + uploadUri).openConnection();
        connection.setDoOutput(true);
        connection.setRequestProperty("Content-Type", "multipart/form-data; boundary=" + boundary);
        return connection;
    }

    private static String generateMultiPartBoundary() {
        return Long.toHexString(System.currentTimeMillis());
    }

    private static String createStringOfSize(int size) {
        StringBuilder sb = new StringBuilder(size);
        for (int i = 0; i < size; ++i) {
            sb.append('a');
        }
        return sb.toString();
    }

    private static void uploadFile(HttpURLConnection connection, String content, String boundary) throws IOException {
        String crlf = "\r\n";
        try (OutputStream output = connection.getOutputStream();
             PrintWriter writer = new PrintWriter((Writer)new OutputStreamWriter(output, StandardCharsets.UTF_8), true);){
            writer.append("--" + boundary).append("\r\n");
            writer.append("Content-Disposition: form-data; name=\"foo\"; filename=\"bar\"").append("\r\n");
            writer.append("Content-Type: plain/text; charset=utf8").append("\r\n");
            writer.append("\r\n").flush();
            output.write(content.getBytes(StandardCharsets.UTF_8));
            output.flush();
            writer.append("\r\n").flush();
            writer.append("--" + boundary + "--").append("\r\n").flush();
        }
    }

    private CompletableFuture<TestResponse> sendRequestToTestHandler(TestRequest testRequest) {
        try {
            return this.restClient.sendRequest(this.serverAddress.getHostName(), this.serverAddress.getPort(), (MessageHeaders)new TestHeaders(), (MessageParameters)RestServerEndpointITCase.createTestParameters(), (RequestBody)testRequest);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static TestParameters createTestParameters() {
        TestParameters parameters = new TestParameters();
        parameters.jobIDPathParameter.resolve(PATH_JOB_ID);
        parameters.jobIDQueryParameter.resolve(Collections.singletonList(QUERY_JOB_ID));
        return parameters;
    }

    private static class TestHandler
    extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
        private final OneShotLatch closeLatch = new OneShotLatch();
        private CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
        private Function<Integer, CompletableFuture<TestResponse>> handlerBody;

        TestHandler(GatewayRetriever<RestfulGateway> leaderRetriever, Duration timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)new TestHeaders());
        }

        protected CompletableFuture<TestResponse> handleRequest(@Nonnull HandlerRequest<TestRequest> request, RestfulGateway gateway) {
            Assertions.assertThat((Comparable)((JobID)request.getPathParameter(JobIDPathParameter.class))).isEqualTo((Object)PATH_JOB_ID);
            Assertions.assertThat((Comparable)((JobID)request.getQueryParameter(JobIDQueryParameter.class).get(0))).isEqualTo((Object)QUERY_JOB_ID);
            int id = ((TestRequest)request.getRequestBody()).id;
            return this.handlerBody.apply(id);
        }

        public CompletableFuture<Void> closeHandlerAsync() {
            this.closeLatch.trigger();
            return this.closeFuture;
        }
    }

    static class TestVersionHandler
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        TestVersionHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Duration timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)TestVersionHeaders.INSTANCE);
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }
    }

    private static enum TestVersionSelectionHeaders1 implements TestVersionSelectionHeadersBase
    {
        INSTANCE;


        public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
            return Collections.singleton(RuntimeRestAPIVersion.V0);
        }
    }

    private static enum TestVersionSelectionHeaders2 implements TestVersionSelectionHeadersBase
    {
        INSTANCE;


        public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
            return Collections.singleton(RuntimeRestAPIVersion.V1);
        }
    }

    private static class TestUploadHandler
    extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        private volatile CompletableFuture<Void> closeFuture = CompletableFuture.completedFuture(null);
        private volatile byte[] lastUploadedFileContents;

        private TestUploadHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Duration timeout) {
            super(leaderRetriever, timeout, Collections.emptyMap(), (MessageHeaders)TestUploadHeaders.INSTANCE);
        }

        protected CompletableFuture<EmptyResponseBody> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) throws RestHandlerException {
            Collection uploadedFiles = request.getUploadedFiles().stream().map(File::toPath).collect(Collectors.toList());
            if (uploadedFiles.size() != 1) {
                throw new RestHandlerException("Expected 1 file, received " + uploadedFiles.size() + ".", HttpResponseStatus.BAD_REQUEST);
            }
            try {
                this.lastUploadedFileContents = Files.readAllBytes((Path)uploadedFiles.iterator().next());
            }
            catch (IOException e) {
                throw new RestHandlerException("Could not read contents of uploaded file.", HttpResponseStatus.INTERNAL_SERVER_ERROR, (Throwable)e);
            }
            return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        }

        public byte[] getLastUploadedFileContents() {
            return this.lastUploadedFileContents;
        }

        protected CompletableFuture<Void> closeHandlerAsync() {
            return this.closeFuture;
        }
    }

    private static class TestHeaders
    implements RuntimeMessageHeaders<TestRequest, TestResponse, TestParameters> {
        private TestHeaders() {
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "/test/:jobid";
        }

        public Class<TestRequest> getRequestClass() {
            return TestRequest.class;
        }

        public Class<TestResponse> getResponseClass() {
            return TestResponse.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        public TestParameters getUnresolvedMessageParameters() {
            return new TestParameters();
        }
    }

    private static enum TestUploadHeaders implements RuntimeMessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters>
    {
        INSTANCE;


        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.POST;
        }

        public String getTargetRestEndpointURL() {
            return "/upload";
        }

        public String getDescription() {
            return "";
        }

        public boolean acceptsFileUploads() {
            return true;
        }
    }

    private static class TestUnavailableHandler
    extends TestRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        protected TestUnavailableHandler(GatewayRetriever<RestfulGateway> leaderRetriever) {
            super(leaderRetriever, TestUnavailableHeaders.INSTANCE, new CompletableFuture[]{FutureUtils.completedExceptionally((Throwable)new EndpointNotStartedException("test exception"))});
        }
    }

    private static class TestRequest
    implements RequestBody {
        public final int id;
        public final String content;

        public TestRequest(int id) {
            this(id, null);
        }

        @JsonCreator
        public TestRequest(@JsonProperty(value="id") int id, @JsonProperty(value="content") String content) {
            this.id = id;
            this.content = content;
        }
    }

    private static class TestResponse
    implements ResponseBody {
        public final int id;
        public final String content;

        public TestResponse(int id) {
            this(id, null);
        }

        @JsonCreator
        public TestResponse(@JsonProperty(value="id") int id, @JsonProperty(value="content") String content) {
            this.id = id;
            this.content = content;
        }
    }

    private static class FaultyTestParameters
    extends TestParameters {
        private final FaultyJobIDPathParameter faultyJobIDPathParameter = new FaultyJobIDPathParameter();

        private FaultyTestParameters() {
        }

        @Override
        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.faultyJobIDPathParameter);
        }
    }

    static class FaultyJobIDPathParameter
    extends MessagePathParameter<JobID> {
        FaultyJobIDPathParameter() {
            super(RestServerEndpointITCase.JOB_ID_KEY);
        }

        protected JobID convertFromString(String value) throws ConversionException {
            return JobID.fromHexString((String)value);
        }

        protected String convertToString(JobID value) {
            return "foobar";
        }

        public String getDescription() {
            return "faulty JobID parameter";
        }
    }

    private static class TestParameters
    extends MessageParameters {
        private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
        private final JobIDQueryParameter jobIDQueryParameter = new JobIDQueryParameter();

        private TestParameters() {
        }

        public Collection<MessagePathParameter<?>> getPathParameters() {
            return Collections.singleton(this.jobIDPathParameter);
        }

        public Collection<MessageQueryParameter<?>> getQueryParameters() {
            return Collections.singleton(this.jobIDQueryParameter);
        }
    }

    static class JobIDQueryParameter
    extends MessageQueryParameter<JobID> {
        JobIDQueryParameter() {
            super(RestServerEndpointITCase.JOB_ID_KEY, MessageParameter.MessageParameterRequisiteness.MANDATORY);
        }

        public JobID convertStringToValue(String value) {
            return JobID.fromHexString((String)value);
        }

        public String convertValueToString(JobID value) {
            return value.toString();
        }

        public String getDescription() {
            return "query JobID parameter";
        }
    }

    static enum TestVersionHeaders implements RuntimeMessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters>
    {
        INSTANCE;


        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/test/versioning";
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return null;
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }

        public Collection<RuntimeRestAPIVersion> getSupportedAPIVersions() {
            return Collections.singleton(RuntimeRestAPIVersion.V1);
        }
    }

    private static enum TestUnavailableHeaders implements RuntimeMessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters>
    {
        INSTANCE;


        public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        public String getTargetRestEndpointURL() {
            return "/unavailable";
        }

        public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        public String getDescription() {
            return "";
        }

        public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }
    }

    static class JobIDPathParameter
    extends MessagePathParameter<JobID> {
        JobIDPathParameter() {
            super(RestServerEndpointITCase.JOB_ID_KEY);
        }

        public JobID convertFromString(String value) {
            return JobID.fromHexString((String)value);
        }

        protected String convertToString(JobID value) {
            return value.toString();
        }

        public String getDescription() {
            return "correct JobID parameter";
        }
    }

    private static interface TestVersionSelectionHeadersBase
    extends MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> {
        default public Class<EmptyRequestBody> getRequestClass() {
            return EmptyRequestBody.class;
        }

        default public HttpMethodWrapper getHttpMethod() {
            return HttpMethodWrapper.GET;
        }

        default public String getTargetRestEndpointURL() {
            return "/test/select-version";
        }

        default public Class<EmptyResponseBody> getResponseClass() {
            return EmptyResponseBody.class;
        }

        default public HttpResponseStatus getResponseStatusCode() {
            return HttpResponseStatus.OK;
        }

        default public String getDescription() {
            return null;
        }

        default public EmptyMessageParameters getUnresolvedMessageParameters() {
            return EmptyMessageParameters.getInstance();
        }
    }
}

