/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.pekko.ControlMessages;
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
import org.apache.flink.runtime.rpc.pekko.PekkoBasedEndpoint;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;

class PekkoRpcActorTest {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoRpcActorTest.class);
    private static Duration timeout = Duration.ofSeconds(10L);
    private static PekkoRpcService pekkoRpcService;

    PekkoRpcActorTest() {
    }

    @BeforeAll
    static void setup() {
        pekkoRpcService = new PekkoRpcService(PekkoUtils.createLocalActorSystem((Configuration)new Configuration()), PekkoRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{pekkoRpcService});
    }

    @Test
    void testAddressResolution() throws Exception {
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)pekkoRpcService);
        CompletableFuture futureRpcGateway = pekkoRpcService.connect(rpcEndpoint.getAddress(), DummyRpcGateway.class);
        DummyRpcGateway rpcGateway = (DummyRpcGateway)futureRpcGateway.get();
        Assertions.assertThat((String)rpcGateway.getAddress()).isEqualTo(rpcEndpoint.getAddress());
    }

    @Test
    void testFailingAddressResolution() throws Exception {
        CompletableFuture futureRpcGateway = pekkoRpcService.connect("foobar", DummyRpcGateway.class);
        Assertions.assertThatThrownBy(() -> futureRpcGateway.get()).hasCauseInstanceOf(RpcConnectionException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testMessageDiscarding() throws Exception {
        int expectedValue = 1337;
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)pekkoRpcService);
        DummyRpcGateway rpcGateway = (DummyRpcGateway)rpcEndpoint.getSelfGateway(DummyRpcGateway.class);
        Assertions.assertThatThrownBy(() -> rpcGateway.foobar().get()).hasCauseInstanceOf(EndpointNotStartedException.class);
        rpcEndpoint.setFoobar(expectedValue);
        rpcEndpoint.start();
        try {
            CompletableFuture<Integer> result = rpcGateway.foobar();
            Integer actualValue = result.get();
            Assertions.assertThat((Integer)actualValue).isEqualTo(expectedValue);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
    }

    @Test
    void testRpcEndpointTerminationFuture() throws Exception {
        DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint((RpcService)pekkoRpcService);
        rpcEndpoint.start();
        CompletableFuture terminationFuture = rpcEndpoint.getTerminationFuture();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        CompletableFuture.runAsync(() -> ((DummyRpcEndpoint)rpcEndpoint).closeAsync(), (Executor)pekkoRpcService.getScheduledExecutor());
        terminationFuture.get();
    }

    @Test
    void testExceptionPropagation() throws Exception {
        ExceptionalEndpoint rpcEndpoint = new ExceptionalEndpoint((RpcService)pekkoRpcService);
        rpcEndpoint.start();
        ExceptionalGateway rpcGateway = (ExceptionalGateway)rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
        CompletableFuture<Integer> result = rpcGateway.doStuff();
        Assertions.assertThatThrownBy(() -> result.get()).extracting(e -> e.getCause()).satisfies(new ThrowingConsumer[]{e -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)e).isInstanceOf(RuntimeException.class)).hasMessage("my super specific test exception")});
    }

    @Test
    void testExceptionPropagationFuturePiping() throws Exception {
        ExceptionalFutureEndpoint rpcEndpoint = new ExceptionalFutureEndpoint((RpcService)pekkoRpcService);
        rpcEndpoint.start();
        ExceptionalGateway rpcGateway = (ExceptionalGateway)rpcEndpoint.getSelfGateway(ExceptionalGateway.class);
        CompletableFuture<Integer> result = rpcGateway.doStuff();
        Assertions.assertThatThrownBy(() -> result.get()).extracting(e -> e.getCause()).satisfies(new ThrowingConsumer[]{e -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)e).isInstanceOf(Exception.class)).hasMessage("some test")});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testResultFutureFailsOnDeserializationError() throws Exception {
        PekkoRpcService serverPekkoRpcService = new PekkoRpcService(PekkoUtils.createActorSystem((String)"serverActorSystem", (Config)PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("localhost", 0))), PekkoRpcServiceConfiguration.defaultConfiguration());
        PekkoRpcService clientPekkoRpcService = new PekkoRpcService(PekkoUtils.createActorSystem((String)"clientActorSystem", (Config)PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("localhost", 0))), PekkoRpcServiceConfiguration.defaultConfiguration());
        try {
            DeserializatonFailingEndpoint rpcEndpoint = new DeserializatonFailingEndpoint((RpcService)serverPekkoRpcService);
            rpcEndpoint.start();
            DeserializatonFailingGateway rpcGateway = (DeserializatonFailingGateway)rpcEndpoint.getSelfGateway(DeserializatonFailingGateway.class);
            DeserializatonFailingGateway connect = (DeserializatonFailingGateway)clientPekkoRpcService.connect(rpcGateway.getAddress(), DeserializatonFailingGateway.class).get();
            FlinkAssertions.assertThatFuture(connect.doStuff()).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(RpcException.class);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{clientPekkoRpcService});
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{serverPekkoRpcService});
            throw throwable;
        }
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{clientPekkoRpcService});
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{serverPekkoRpcService});
    }

    @Test
    void testOnStopExceptionPropagation() throws Exception {
        FailingOnStopEndpoint rpcEndpoint = new FailingOnStopEndpoint((RpcService)pekkoRpcService, "FailingOnStopEndpoint");
        rpcEndpoint.start();
        CompletableFuture terminationFuture = rpcEndpoint.closeAsync();
        Assertions.assertThatThrownBy(terminationFuture::get).hasCauseInstanceOf(FailingOnStopEndpoint.OnStopException.class);
    }

    @Test
    void testOnStopExecutedByMainThread() throws Exception {
        SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint((RpcService)pekkoRpcService, "SimpleRpcEndpoint");
        simpleRpcEndpoint.start();
        CompletableFuture terminationFuture = simpleRpcEndpoint.closeAsync();
        terminationFuture.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testActorTerminationWhenServiceShutdown() throws Exception {
        ActorSystem rpcActorSystem = PekkoUtils.createDefaultActorSystem();
        PekkoRpcService rpcService = new PekkoRpcService(rpcActorSystem, PekkoRpcServiceConfiguration.defaultConfiguration());
        try {
            SimpleRpcEndpoint rpcEndpoint = new SimpleRpcEndpoint((RpcService)rpcService, SimpleRpcEndpoint.class.getSimpleName());
            rpcEndpoint.start();
            CompletableFuture terminationFuture = rpcEndpoint.getTerminationFuture();
            rpcService.closeAsync();
            terminationFuture.get();
        }
        finally {
            rpcActorSystem.terminate();
            ScalaFutureUtils.toJava((Future)rpcActorSystem.whenTerminated()).get();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testActorTerminationWithAsynchronousOnStopAction() throws Exception {
        CompletableFuture<Void> onStopFuture = new CompletableFuture<Void>();
        AsynchronousOnStopEndpoint endpoint = new AsynchronousOnStopEndpoint((RpcService)pekkoRpcService, onStopFuture);
        try {
            endpoint.start();
            CompletableFuture terminationFuture = endpoint.closeAsync();
            Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
            onStopFuture.complete(null);
            terminationFuture.get();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
    }

    @Test
    void testMainThreadExecutionOnStop() throws Exception {
        MainThreadExecutorOnStopEndpoint endpoint = new MainThreadExecutorOnStopEndpoint((RpcService)pekkoRpcService);
        try {
            endpoint.start();
            CompletableFuture terminationFuture = endpoint.closeAsync();
            terminationFuture.get();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testOnStopFutureCompletionDirectlyTerminatesRpcActor() throws Exception {
        CompletableFuture<Void> onStopFuture = new CompletableFuture<Void>();
        TerminatingAfterOnStopFutureCompletionEndpoint endpoint = new TerminatingAfterOnStopFutureCompletionEndpoint((RpcService)pekkoRpcService, onStopFuture);
        try {
            endpoint.start();
            AsyncOperationGateway asyncOperationGateway = (AsyncOperationGateway)endpoint.getSelfGateway(AsyncOperationGateway.class);
            CompletableFuture terminationFuture = endpoint.closeAsync();
            Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
            CompletableFuture<Integer> firstAsyncOperationFuture = asyncOperationGateway.asyncOperation(timeout);
            CompletableFuture<Integer> secondAsyncOperationFuture = asyncOperationGateway.asyncOperation(timeout);
            endpoint.awaitEnterAsyncOperation();
            onStopFuture.complete(null);
            Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
            endpoint.triggerUnblockAsyncOperation();
            Assertions.assertThat((Integer)firstAsyncOperationFuture.get()).isEqualTo(42);
            terminationFuture.get();
            Assertions.assertThat((int)endpoint.getNumberAsyncOperationCalls()).isEqualTo(1);
            FlinkAssertions.assertThatFuture(secondAsyncOperationFuture).eventuallyFailsWith(ExecutionException.class).withCauseInstanceOf(RecipientUnreachableException.class);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
    }

    @Test
    void testOnStartIsCalledWhenRpcEndpointStarts() throws Exception {
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint((RpcService)pekkoRpcService, null);
        try {
            onStartEndpoint.start();
            onStartEndpoint.awaitUntilOnStartCalled();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{onStartEndpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{onStartEndpoint});
    }

    @Test
    void testOnStartFails() throws Exception {
        FlinkException testException = new FlinkException("Test exception");
        OnStartEndpoint onStartEndpoint = new OnStartEndpoint((RpcService)pekkoRpcService, (Exception)testException);
        onStartEndpoint.start();
        onStartEndpoint.awaitUntilOnStartCalled();
        Assertions.assertThatThrownBy(() -> onStartEndpoint.getTerminationFuture().get()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(testException.getClass(), (String)testException.getMessage())});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void callsOnStopOnlyOnce() throws Exception {
        CompletableFuture<Void> onStopFuture = new CompletableFuture<Void>();
        OnStopCountingRpcEndpoint endpoint = new OnStopCountingRpcEndpoint((RpcService)pekkoRpcService, onStopFuture);
        try {
            endpoint.start();
            PekkoBasedEndpoint selfGateway = (PekkoBasedEndpoint)endpoint.getSelfGateway(PekkoBasedEndpoint.class);
            selfGateway.getActorRef().tell((Object)ControlMessages.TERMINATE, ActorRef.noSender());
            selfGateway.getActorRef().tell((Object)ControlMessages.TERMINATE, ActorRef.noSender());
            endpoint.waitUntilOnStopHasBeenCalled();
            onStopFuture.complete(null);
            endpoint.getTerminationFuture().get();
            Assertions.assertThat((int)endpoint.getNumOnStopCalls()).isEqualTo(1);
            onStopFuture.complete(null);
        }
        catch (Throwable throwable) {
            onStopFuture.complete(null);
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
    }

    @Test
    void canReuseEndpointNameAfterTermination() throws Exception {
        String endpointName = "not_unique";
        try (SimpleRpcEndpoint simpleRpcEndpoint1 = new SimpleRpcEndpoint((RpcService)pekkoRpcService, "not_unique");){
            simpleRpcEndpoint1.start();
            simpleRpcEndpoint1.closeAsync().join();
            try (SimpleRpcEndpoint simpleRpcEndpoint2 = new SimpleRpcEndpoint((RpcService)pekkoRpcService, "not_unique");){
                simpleRpcEndpoint2.start();
                Assertions.assertThat((String)simpleRpcEndpoint2.getAddress()).isEqualTo(simpleRpcEndpoint1.getAddress());
            }
        }
    }

    @Test
    void terminationFutureDoesNotBlockRpcEndpointCreation() throws Exception {
        try (SimpleRpcEndpoint simpleRpcEndpoint = new SimpleRpcEndpoint((RpcService)pekkoRpcService, "foobar");){
            CompletableFuture terminationFuture = simpleRpcEndpoint.getTerminationFuture();
            CompletionStage foobar2 = terminationFuture.thenApply(ignored -> new SimpleRpcEndpoint((RpcService)pekkoRpcService, "foobar2"));
            simpleRpcEndpoint.closeAsync();
            SimpleRpcEndpoint simpleRpcEndpoint2 = (SimpleRpcEndpoint)((Object)((CompletableFuture)foobar2).join());
            simpleRpcEndpoint2.close();
        }
    }

    @Test
    void resolvesRunningRpcActor() throws Exception {
        String endpointName = "foobar";
        try (RpcEndpoint simpleRpcEndpoint1 = this.createRpcEndpointWithRandomNameSuffix("foobar");
             RpcEndpoint simpleRpcEndpoint2 = this.createRpcEndpointWithRandomNameSuffix("foobar");){
            simpleRpcEndpoint1.closeAsync().join();
            String wildcardName = RpcServiceUtils.createWildcardName((String)"foobar");
            String wildcardAddress = PekkoRpcServiceUtils.getLocalRpcUrl((String)wildcardName);
            RpcGateway rpcGateway = (RpcGateway)pekkoRpcService.connect(wildcardAddress, RpcGateway.class).join();
            Assertions.assertThat((String)rpcGateway.getAddress()).isEqualTo(simpleRpcEndpoint2.getAddress());
        }
    }

    private RpcEndpoint createRpcEndpointWithRandomNameSuffix(String prefix) {
        return new SimpleRpcEndpoint((RpcService)pekkoRpcService, RpcServiceUtils.createRandomName((String)prefix));
    }

    @Test
    void canRespondWithNullValueLocally() throws Exception {
        try (NullRespondingEndpoint nullRespondingEndpoint = new NullRespondingEndpoint((RpcService)pekkoRpcService);){
            nullRespondingEndpoint.start();
            NullRespondingGateway selfGateway = (NullRespondingGateway)nullRespondingEndpoint.getSelfGateway(NullRespondingGateway.class);
            CompletableFuture<Integer> nullValuedResponseFuture = selfGateway.foobar();
            Assertions.assertThat((Integer)nullValuedResponseFuture.join()).isNull();
        }
    }

    @Test
    void canRespondWithSynchronousNullValueLocally() throws Exception {
        try (NullRespondingEndpoint nullRespondingEndpoint = new NullRespondingEndpoint((RpcService)pekkoRpcService);){
            nullRespondingEndpoint.start();
            NullRespondingGateway selfGateway = (NullRespondingGateway)nullRespondingEndpoint.getSelfGateway(NullRespondingGateway.class);
            Integer value = selfGateway.synchronousFoobar();
            Assertions.assertThat((Integer)value).isNull();
        }
    }

    @Test
    void canRespondWithSerializedValueLocally() throws Exception {
        try (SerializedValueRespondingEndpoint endpoint = new SerializedValueRespondingEndpoint((RpcService)pekkoRpcService);){
            endpoint.start();
            SerializedValueRespondingGateway selfGateway = (SerializedValueRespondingGateway)endpoint.getSelfGateway(SerializedValueRespondingGateway.class);
            Assertions.assertThat(selfGateway.getSerializedValueSynchronously()).isEqualTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
            CompletableFuture<SerializedValue<String>> responseFuture = selfGateway.getSerializedValue();
            Assertions.assertThat(responseFuture.get()).isEqualTo(SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
        }
    }

    @Test
    void testScheduling() throws ExecutionException, InterruptedException {
        SchedulingRpcEndpoint endpoint = new SchedulingRpcEndpoint((RpcService)pekkoRpcService);
        endpoint.start();
        SchedulingRpcEndpointGateway gateway = (SchedulingRpcEndpointGateway)endpoint.getSelfGateway(SchedulingRpcEndpointGateway.class);
        CompletableFuture<Void> scheduleRunnableFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> scheduleCallableFuture = new CompletableFuture<Void>();
        CompletableFuture<Void> executeFuture = new CompletableFuture<Void>();
        long scheduleTime = System.nanoTime();
        gateway.schedule(scheduleRunnableFuture, scheduleCallableFuture, executeFuture);
        Assertions.assertThat((Long)((Long)((CompletableFuture)scheduleRunnableFuture.thenApply(ignored -> System.nanoTime())).get())).isGreaterThanOrEqualTo(scheduleTime + Duration.ofMillis(20L).toNanos());
        Assertions.assertThat((Long)((Long)((CompletableFuture)scheduleCallableFuture.thenApply(ignored -> System.nanoTime())).get())).isGreaterThanOrEqualTo(scheduleTime + Duration.ofMillis(20L).toNanos());
        executeFuture.get();
    }

    static class DummyRpcEndpoint
    extends RpcEndpoint
    implements DummyRpcGateway {
        private volatile int foobar = 42;

        protected DummyRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(this.foobar);
        }

        public void setFoobar(int value) {
            this.foobar = value;
        }
    }

    static interface DummyRpcGateway
    extends RpcGateway {
        public CompletableFuture<Integer> foobar();
    }

    private static class ExceptionalEndpoint
    extends RpcEndpoint
    implements ExceptionalGateway {
        protected ExceptionalEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<Integer> doStuff() {
            throw new RuntimeException("my super specific test exception");
        }
    }

    private static interface ExceptionalGateway
    extends RpcGateway {
        public CompletableFuture<Integer> doStuff();
    }

    private static class ExceptionalFutureEndpoint
    extends RpcEndpoint
    implements ExceptionalGateway {
        protected ExceptionalFutureEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<Integer> doStuff() {
            final CompletableFuture<Integer> future = new CompletableFuture<Integer>();
            new Thread(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(10L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    future.completeExceptionally(new Exception("some test"));
                }
            }.start();
            return future;
        }
    }

    private static class DeserializatonFailingEndpoint
    extends RpcEndpoint
    implements DeserializatonFailingGateway {
        protected DeserializatonFailingEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<DeserializationFailingObject> doStuff() {
            return CompletableFuture.completedFuture(new DeserializationFailingObject());
        }
    }

    private static interface DeserializatonFailingGateway
    extends RpcGateway {
        public CompletableFuture<DeserializationFailingObject> doStuff();
    }

    private static class FailingOnStopEndpoint
    extends RpcEndpoint
    implements RpcGateway {
        protected FailingOnStopEndpoint(RpcService rpcService, String endpointId) {
            super(rpcService, endpointId);
        }

        public CompletableFuture<Void> onStop() {
            return FutureUtils.completedExceptionally((Throwable)((Object)new OnStopException("Test exception.")));
        }

        private static class OnStopException
        extends FlinkException {
            private static final long serialVersionUID = 6701096588415871592L;

            public OnStopException(String message) {
                super(message);
            }
        }
    }

    private static class SimpleRpcEndpoint
    extends RpcEndpoint
    implements RpcGateway {
        protected SimpleRpcEndpoint(RpcService rpcService, String endpointId) {
            super(rpcService, endpointId);
        }
    }

    static class AsynchronousOnStopEndpoint
    extends RpcEndpoint {
        private final CompletableFuture<Void> onStopFuture;

        protected AsynchronousOnStopEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
            super(rpcService);
            this.onStopFuture = (CompletableFuture)Preconditions.checkNotNull(onStopFuture);
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }
    }

    private static class MainThreadExecutorOnStopEndpoint
    extends RpcEndpoint {
        protected MainThreadExecutorOnStopEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        public CompletableFuture<Void> onStop() {
            return CompletableFuture.runAsync(() -> {}, (Executor)this.getMainThreadExecutor());
        }
    }

    private static class TerminatingAfterOnStopFutureCompletionEndpoint
    extends RpcEndpoint
    implements AsyncOperationGateway {
        private final CompletableFuture<Void> onStopFuture;
        private final OneShotLatch blockAsyncOperation = new OneShotLatch();
        private final OneShotLatch enterAsyncOperation = new OneShotLatch();
        private final AtomicInteger asyncOperationCounter = new AtomicInteger(0);

        protected TerminatingAfterOnStopFutureCompletionEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
            super(rpcService);
            this.onStopFuture = onStopFuture;
        }

        @Override
        public CompletableFuture<Integer> asyncOperation(Duration timeout) {
            this.asyncOperationCounter.incrementAndGet();
            this.enterAsyncOperation.trigger();
            try {
                this.blockAsyncOperation.await();
            }
            catch (InterruptedException e) {
                throw new FlinkRuntimeException((Throwable)e);
            }
            return CompletableFuture.completedFuture(42);
        }

        public CompletableFuture<Void> onStop() {
            return this.onStopFuture;
        }

        void awaitEnterAsyncOperation() throws InterruptedException {
            this.enterAsyncOperation.await();
        }

        void triggerUnblockAsyncOperation() {
            this.blockAsyncOperation.trigger();
        }

        int getNumberAsyncOperationCalls() {
            return this.asyncOperationCounter.get();
        }
    }

    static interface AsyncOperationGateway
    extends RpcGateway {
        public CompletableFuture<Integer> asyncOperation(@RpcTimeout Duration var1);
    }

    private static final class OnStartEndpoint
    extends RpcEndpoint {
        private final CountDownLatch countDownLatch = new CountDownLatch(1);
        @Nullable
        private final Exception exception;

        OnStartEndpoint(RpcService rpcService, @Nullable Exception exception) {
            super(rpcService);
            this.exception = exception;
            this.getTerminationFuture().whenComplete((aVoid, throwable) -> this.closeAsync());
        }

        public void onStart() throws Exception {
            this.countDownLatch.countDown();
            ExceptionUtils.tryRethrowException((Exception)this.exception);
        }

        public void awaitUntilOnStartCalled() throws InterruptedException {
            this.countDownLatch.await();
        }
    }

    private static final class OnStopCountingRpcEndpoint
    extends RpcEndpoint {
        private final AtomicInteger numOnStopCalls = new AtomicInteger(0);
        private final OneShotLatch onStopHasBeenCalled = new OneShotLatch();
        private final CompletableFuture<Void> onStopFuture;

        private OnStopCountingRpcEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture) {
            super(rpcService);
            this.onStopFuture = onStopFuture;
        }

        protected CompletableFuture<Void> onStop() {
            this.onStopHasBeenCalled.trigger();
            this.numOnStopCalls.incrementAndGet();
            return this.onStopFuture;
        }

        private int getNumOnStopCalls() {
            return this.numOnStopCalls.get();
        }

        private void waitUntilOnStopHasBeenCalled() throws InterruptedException {
            this.onStopHasBeenCalled.await();
        }
    }

    static class NullRespondingEndpoint
    extends RpcEndpoint
    implements NullRespondingGateway {
        protected NullRespondingEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public Integer synchronousFoobar() {
            return null;
        }
    }

    static interface NullRespondingGateway
    extends DummyRpcGateway {
        public Integer synchronousFoobar();
    }

    static class SerializedValueRespondingEndpoint
    extends RpcEndpoint
    implements SerializedValueRespondingGateway {
        static final SerializedValue<String> SERIALIZED_VALUE;

        public SerializedValueRespondingEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public CompletableFuture<SerializedValue<String>> getSerializedValue() {
            return CompletableFuture.completedFuture(SERIALIZED_VALUE);
        }

        @Override
        public SerializedValue<String> getSerializedValueSynchronously() {
            return SERIALIZED_VALUE;
        }

        static {
            try {
                SERIALIZED_VALUE = new SerializedValue((Object)"string-value");
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }
    }

    static interface SerializedValueRespondingGateway
    extends RpcGateway {
        public CompletableFuture<SerializedValue<String>> getSerializedValue();

        public SerializedValue<String> getSerializedValueSynchronously();
    }

    private static final class SchedulingRpcEndpoint
    extends RpcEndpoint
    implements SchedulingRpcEndpointGateway {
        static final int DELAY_MILLIS = 20;

        public SchedulingRpcEndpoint(RpcService rpcService) {
            super(rpcService);
        }

        @Override
        public void schedule(CompletableFuture<Void> scheduleRunnableFuture, CompletableFuture<Void> scheduleCallableFuture, CompletableFuture<Void> executeFuture) {
            this.getMainThreadExecutor().schedule(() -> scheduleRunnableFuture.complete(null), 20L, TimeUnit.MILLISECONDS);
            this.getMainThreadExecutor().schedule(() -> {
                scheduleCallableFuture.complete(null);
                return null;
            }, 20L, TimeUnit.MILLISECONDS);
            this.getMainThreadExecutor().execute(() -> executeFuture.complete(null));
        }
    }

    static interface SchedulingRpcEndpointGateway
    extends RpcGateway {
        @Local
        public void schedule(CompletableFuture<Void> var1, CompletableFuture<Void> var2, CompletableFuture<Void> var3);
    }

    private static class DeserializationFailingObject
    implements Serializable {
        private DeserializationFailingObject() {
        }

        private void readObject(ObjectInputStream aInputStream) throws ClassNotFoundException, IOException {
            throw new ClassNotFoundException("test exception");
        }
    }
}

