/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.net.URL;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.concurrent.Future;

class ContextClassLoadingSettingTest {
    private static final ClassLoader testClassLoader = new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
    private ClassLoader pretendFlinkClassLoader;
    private ActorSystem actorSystem;
    private PekkoRpcService pekkoRpcService;

    ContextClassLoadingSettingTest() {
    }

    @BeforeEach
    void setup() {
        this.pretendFlinkClassLoader = new URLClassLoader(new URL[0], ContextClassLoadingSettingTest.class.getClassLoader());
        this.actorSystem = PekkoUtils.createDefaultActorSystem();
        this.pekkoRpcService = new PekkoRpcService(this.actorSystem, PekkoRpcServiceConfiguration.defaultConfiguration(), this.pretendFlinkClassLoader);
        PickyObject.classLoaderAssertion = this::assertIsFlinkClassLoader;
    }

    @AfterEach
    void shutdown() throws InterruptedException, ExecutionException {
        CompletableFuture rpcTerminationFuture = this.pekkoRpcService.closeAsync();
        CompletableFuture actorSystemTerminationFuture = ScalaFutureUtils.toJava((Future)this.actorSystem.terminate());
        FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)).get();
        this.actorSystem = null;
        this.pekkoRpcService = null;
    }

    @Test
    void testRpcService_ExecuteRunnableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        CompletableFuture contextClassLoader = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().execute(() -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()));
        Assertions.assertThat((Object)((ClassLoader)contextClassLoader.get())).isSameAs((Object)this.pretendFlinkClassLoader);
    }

    @Test
    void testRpcService_ScheduleCallableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        ClassLoader contextClassLoader = (ClassLoader)this.pekkoRpcService.getScheduledExecutor().schedule(() -> Thread.currentThread().getContextClassLoader(), 0L, TimeUnit.MILLISECONDS).get();
        Assertions.assertThat((Object)contextClassLoader).isSameAs((Object)this.pretendFlinkClassLoader);
    }

    @Test
    void testRpcService_ScheduleRunnableSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        CompletableFuture contextClassLoader = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().schedule(() -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()), 5L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((Object)((ClassLoader)contextClassLoader.get())).isSameAs((Object)this.pretendFlinkClassLoader);
    }

    @Test
    void testRpcService_ScheduleRunnableWithFixedRateSetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        int numberOfScheduledRuns = 2;
        ArrayList contextClassLoaders = new ArrayList(2);
        CompletableFuture terminalFuture = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().scheduleAtFixedRate(() -> {
            if (contextClassLoaders.size() >= 2) {
                terminalFuture.complete(null);
                throw new RuntimeException("cancel task");
            }
            contextClassLoaders.add(Thread.currentThread().getContextClassLoader());
        }, 0L, 1L, TimeUnit.MILLISECONDS);
        terminalFuture.get();
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)contextClassLoaders.size());
        Assertions.assertThat(contextClassLoaders).allSatisfy(classLoader -> Assertions.assertThat((Object)classLoader).isSameAs((Object)this.pretendFlinkClassLoader));
    }

    @Test
    void testRpcService_ScheduleRunnableWithFixedDelaySetsFlinkContextClassLoader() throws ExecutionException, InterruptedException {
        int numberOfScheduledRuns = 2;
        ArrayList contextClassLoaders = new ArrayList(2);
        CompletableFuture terminalFuture = new CompletableFuture();
        this.pekkoRpcService.getScheduledExecutor().scheduleWithFixedDelay(() -> {
            if (contextClassLoaders.size() >= 2) {
                terminalFuture.complete(null);
                throw new RuntimeException("cancel task");
            }
            contextClassLoaders.add(Thread.currentThread().getContextClassLoader());
        }, 0L, 1L, TimeUnit.MILLISECONDS);
        terminalFuture.get();
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)contextClassLoaders.size());
        Assertions.assertThat(contextClassLoaders).allSatisfy(classLoader -> Assertions.assertThat((Object)classLoader).isSameAs((Object)this.pretendFlinkClassLoader));
    }

    @Test
    void testRpcService_ConnectFutureCompletedWithFlinkContextClassLoader() throws Exception {
        try (TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);){
            ClassLoader contextClassLoader = (ClassLoader)ClassLoadingUtils.runWithContextClassLoader(() -> (ClassLoader)((CompletableFuture)this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).thenApply(ignored -> Thread.currentThread().getContextClassLoader())).get(), (ClassLoader)testClassLoader);
            this.assertIsFlinkClassLoader(contextClassLoader);
        }
    }

    @Test
    void testRpcService_TerminationFutureCompletedWithFlinkContextClassLoader() throws Exception {
        ClassLoader contextClassLoader = (ClassLoader)ClassLoadingUtils.runWithContextClassLoader(() -> (ClassLoader)((CompletableFuture)this.pekkoRpcService.closeAsync().thenApply(ignored -> Thread.currentThread().getContextClassLoader())).get(), (ClassLoader)testClassLoader);
        this.assertIsFlinkClassLoader(contextClassLoader);
    }

    @Test
    void testRpcActor_OnStartCalledWithFlinkContextClassLoader() throws Exception {
        try (TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);){
            testEndpoint.start();
            this.assertIsFlinkClassLoader(testEndpoint.onStartClassLoader.get());
        }
    }

    @Test
    void testRpcActor_OnStopCalledWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);
        testEndpoint.start();
        testEndpoint.close();
        this.assertIsFlinkClassLoader(testEndpoint.onStopClassLoader.get());
    }

    @Test
    void testRpcActor_CallAsyncCalledWithFlinkContextClassLoader() throws Exception {
        try (TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);){
            testEndpoint.start();
            CompletableFuture<ClassLoader> contextClassLoader = testEndpoint.doCallAsync();
            this.assertIsFlinkClassLoader(contextClassLoader.get());
        }
    }

    @Test
    void testRpcActor_RunAsyncCalledWithFlinkContextClassLoader() throws Exception {
        try (TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);){
            testEndpoint.start();
            CompletableFuture<ClassLoader> contextClassLoader = testEndpoint.doRunAsync();
            this.assertIsFlinkClassLoader(contextClassLoader.get());
        }
    }

    @Test
    void testRpcActor_RPCReturningVoidCalledWithFlinkContextClassLoader() throws Exception {
        try (TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);){
            testEndpoint.start();
            TestEndpointGateway testEndpointGateway = (TestEndpointGateway)this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get();
            testEndpointGateway.doSomethingWithoutReturningAnything();
            this.assertIsFlinkClassLoader(testEndpoint.voidOperationClassLoader.get());
        }
    }

    @Test
    void testRpcActor_RPCCalledWithFlinkContextClassLoader() throws Exception {
        try (TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);){
            testEndpoint.start();
            TestEndpointGateway testEndpointGateway = (TestEndpointGateway)this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get();
            ClassLoader contextClassLoader = testEndpointGateway.getContextClassLoader().get();
            this.assertIsFlinkClassLoader(contextClassLoader);
        }
    }

    @Test
    void testRpcInvocationHandler_RPCFutureCompletedWithFlinkContextClassLoader() throws Exception {
        try (TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);){
            testEndpoint.start();
            TestEndpointGateway testEndpointGateway = (TestEndpointGateway)this.pekkoRpcService.connect(testEndpoint.getAddress(), TestEndpointGateway.class).get();
            CompletableFuture contextClassLoader = (CompletableFuture)ClassLoadingUtils.runWithContextClassLoader(() -> testEndpointGateway.doSomethingAsync().thenApply(ignored -> Thread.currentThread().getContextClassLoader()), (ClassLoader)testClassLoader);
            testEndpoint.completeRPCFuture();
            this.assertIsFlinkClassLoader((ClassLoader)contextClassLoader.get());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRpcInvocationHandler_ContextClassLoaderUsedForDeserialization() throws Exception {
        PekkoRpcService serverPekkoRpcService = new PekkoRpcService(PekkoUtils.createActorSystem((String)"serverActorSystem", (Config)PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("localhost", 0))), PekkoRpcServiceConfiguration.defaultConfiguration());
        PekkoRpcService clientPekkoRpcService = new PekkoRpcService(PekkoUtils.createActorSystem((String)"clientActorSystem", (Config)PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("localhost", 0))), PekkoRpcServiceConfiguration.defaultConfiguration(), this.pretendFlinkClassLoader);
        try {
            TestEndpoint rpcEndpoint = new TestEndpoint((RpcService)serverPekkoRpcService, new PickyObject());
            rpcEndpoint.start();
            TestEndpointGateway rpcGateway = (TestEndpointGateway)rpcEndpoint.getSelfGateway(TestEndpointGateway.class);
            TestEndpointGateway connect = (TestEndpointGateway)clientPekkoRpcService.connect(rpcGateway.getAddress(), TestEndpointGateway.class).get();
            connect.getPickyObject().get();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{clientPekkoRpcService});
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{serverPekkoRpcService});
            throw throwable;
        }
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{clientPekkoRpcService});
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{serverPekkoRpcService});
    }

    @Test
    void testSupervisorActor_TerminationFutureCompletedWithFlinkContextClassLoader() throws Exception {
        TestEndpoint testEndpoint = new TestEndpoint((RpcService)this.pekkoRpcService);
        testEndpoint.start();
        ClassLoader contextClassLoader = (ClassLoader)ClassLoadingUtils.runWithContextClassLoader(() -> (ClassLoader)((CompletableFuture)testEndpoint.closeAsync().thenApply(ignored -> Thread.currentThread().getContextClassLoader())).get(), (ClassLoader)testClassLoader);
        this.assertIsFlinkClassLoader(contextClassLoader);
    }

    private void assertIsFlinkClassLoader(ClassLoader classLoader) {
        Assertions.assertThat((Object)classLoader).satisfiesAnyOf(new ThrowingConsumer[]{cl -> Assertions.assertThat((Object)cl).isSameAs((Object)this.pretendFlinkClassLoader), cl -> Assertions.assertThat((Object)cl).isSameAs((Object)testClassLoader)});
    }

    private static class PickyObject
    implements Serializable {
        static Consumer<ClassLoader> classLoaderAssertion = null;

        private PickyObject() {
        }

        private void readObject(ObjectInputStream aInputStream) throws ClassNotFoundException, IOException {
            classLoaderAssertion.accept(Thread.currentThread().getContextClassLoader());
        }
    }

    private static class TestEndpoint
    extends RpcEndpoint
    implements TestEndpointGateway {
        private final CompletableFuture<ClassLoader> onStartClassLoader = new CompletableFuture();
        private final CompletableFuture<ClassLoader> onStopClassLoader = new CompletableFuture();
        private final CompletableFuture<ClassLoader> voidOperationClassLoader = new CompletableFuture();
        private final CompletableFuture<Void> rpcResponseFuture = new CompletableFuture();
        @Nullable
        private final PickyObject pickyObject;

        protected TestEndpoint(RpcService rpcService) {
            this(rpcService, null);
        }

        protected TestEndpoint(RpcService rpcService, @Nullable PickyObject pickyObject) {
            super(rpcService);
            this.pickyObject = pickyObject;
        }

        protected void onStart() throws Exception {
            this.onStartClassLoader.complete(Thread.currentThread().getContextClassLoader());
            super.onStart();
        }

        protected CompletableFuture<Void> onStop() {
            this.onStopClassLoader.complete(Thread.currentThread().getContextClassLoader());
            return CompletableFuture.completedFuture(null);
        }

        @Override
        public CompletableFuture<Void> doSomethingAsync() {
            return this.rpcResponseFuture;
        }

        public CompletableFuture<ClassLoader> doCallAsync() {
            return this.callAsync(() -> Thread.currentThread().getContextClassLoader(), Duration.ofSeconds(10L));
        }

        public CompletableFuture<ClassLoader> doRunAsync() {
            CompletableFuture<ClassLoader> contextClassLoader = new CompletableFuture<ClassLoader>();
            this.runAsync(() -> contextClassLoader.complete(Thread.currentThread().getContextClassLoader()));
            return contextClassLoader;
        }

        @Override
        public void doSomethingWithoutReturningAnything() {
            this.voidOperationClassLoader.complete(Thread.currentThread().getContextClassLoader());
        }

        @Override
        public CompletableFuture<PickyObject> getPickyObject() {
            return CompletableFuture.completedFuture(this.pickyObject);
        }

        public void completeRPCFuture() {
            this.rpcResponseFuture.complete(null);
        }

        @Override
        @Local
        public CompletableFuture<ClassLoader> getContextClassLoader() {
            return CompletableFuture.completedFuture(Thread.currentThread().getContextClassLoader());
        }
    }

    private static interface TestEndpointGateway
    extends RpcGateway {
        public CompletableFuture<ClassLoader> getContextClassLoader();

        public CompletableFuture<Void> doSomethingAsync();

        public void doSomethingWithoutReturningAnything();

        public CompletableFuture<PickyObject> getPickyObject();
    }
}

