/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcActorTest;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import scala.concurrent.Future;

class PekkoRpcServiceTest {
    private static ActorSystem actorSystem;
    private static PekkoRpcService pekkoRpcService;

    PekkoRpcServiceTest() {
    }

    @BeforeAll
    static void setup() {
        actorSystem = PekkoUtils.createDefaultActorSystem();
        pekkoRpcService = new PekkoRpcService(actorSystem, PekkoRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void shutdown() throws InterruptedException, ExecutionException, TimeoutException {
        CompletableFuture rpcTerminationFuture = pekkoRpcService.closeAsync();
        CompletableFuture actorSystemTerminationFuture = ScalaFutureUtils.toJava((Future)actorSystem.terminate());
        FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)).get();
        actorSystem = null;
        pekkoRpcService = null;
    }

    @Test
    void testScheduleRunnable() throws Exception {
        OneShotLatch latch = new OneShotLatch();
        long delay = 100L;
        long start = System.nanoTime();
        ScheduledFuture scheduledFuture = pekkoRpcService.getScheduledExecutor().schedule(() -> ((OneShotLatch)latch).trigger(), 100L, TimeUnit.MILLISECONDS);
        scheduledFuture.get();
        Assertions.assertThat((boolean)latch.isTriggered()).isTrue();
        long stop = System.nanoTime();
        ((AbstractLongAssert)Assertions.assertThat((long)((stop - start) / 1000000L)).as("call was not properly delayed", new Object[0])).isGreaterThanOrEqualTo(100L);
    }

    @Test
    void testExecuteRunnable() throws Exception {
        OneShotLatch latch = new OneShotLatch();
        pekkoRpcService.getScheduledExecutor().execute(() -> ((OneShotLatch)latch).trigger());
        latch.await(30L, TimeUnit.SECONDS);
    }

    @Test
    void testGetAddress() {
        Assertions.assertThat((String)pekkoRpcService.getAddress()).isEqualTo((String)PekkoUtils.getAddress((ActorSystem)actorSystem).host().get());
    }

    @Test
    void testGetPort() {
        Assertions.assertThat((int)pekkoRpcService.getPort()).isEqualTo(PekkoUtils.getAddress((ActorSystem)actorSystem).port().get());
    }

    @Test
    void testScheduledExecutorServiceSimpleSchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = pekkoRpcService.getScheduledExecutor();
        OneShotLatch latch = new OneShotLatch();
        ScheduledFuture future = scheduledExecutor.schedule(() -> ((OneShotLatch)latch).trigger(), 10L, TimeUnit.MILLISECONDS);
        future.get();
        Assertions.assertThat((boolean)latch.isTriggered()).isTrue();
    }

    @Test
    void testScheduledExecutorServicePeriodicSchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = pekkoRpcService.getScheduledExecutor();
        int tries = 4;
        long delay = 10L;
        CountDownLatch countDownLatch = new CountDownLatch(4);
        long currentTime = System.nanoTime();
        ScheduledFuture future = scheduledExecutor.scheduleAtFixedRate(countDownLatch::countDown, 10L, 10L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((java.util.concurrent.Future)future).isNotDone();
        countDownLatch.await();
        Assertions.assertThat((java.util.concurrent.Future)future).isNotDone();
        long finalTime = System.nanoTime() - currentTime;
        Assertions.assertThat((long)finalTime).isGreaterThanOrEqualTo(40L);
        future.cancel(true);
    }

    @Test
    void testScheduledExecutorServiceWithFixedDelaySchedule() throws Exception {
        ScheduledExecutor scheduledExecutor = pekkoRpcService.getScheduledExecutor();
        int tries = 4;
        long delay = 10L;
        CountDownLatch countDownLatch = new CountDownLatch(4);
        long currentTime = System.nanoTime();
        ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(countDownLatch::countDown, 10L, 10L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((java.util.concurrent.Future)future).isNotDone();
        countDownLatch.await();
        Assertions.assertThat((java.util.concurrent.Future)future).isNotDone();
        long finalTime = System.nanoTime() - currentTime;
        Assertions.assertThat((long)finalTime).isGreaterThanOrEqualTo(40L);
        future.cancel(true);
    }

    @Test
    void testScheduledExecutorServiceCancelWithFixedDelay() throws InterruptedException {
        ScheduledExecutor scheduledExecutor = pekkoRpcService.getScheduledExecutor();
        long delay = 10L;
        OneShotLatch futureTask = new OneShotLatch();
        OneShotLatch latch = new OneShotLatch();
        OneShotLatch shouldNotBeTriggeredLatch = new OneShotLatch();
        ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(() -> {
            try {
                if (futureTask.isTriggered()) {
                    shouldNotBeTriggeredLatch.trigger();
                } else {
                    futureTask.trigger();
                    latch.await();
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }, delay, delay, TimeUnit.MILLISECONDS);
        futureTask.await();
        future.cancel(false);
        latch.trigger();
        Assertions.assertThatThrownBy(() -> shouldNotBeTriggeredLatch.await(5L * delay, TimeUnit.MILLISECONDS)).isInstanceOf(TimeoutException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRpcServiceShutDownWithRpcEndpoints() throws Exception {
        PekkoRpcService pekkoRpcService = this.startRpcService();
        try {
            int numberActors = 5;
            RpcServiceShutdownTestHelper rpcServiceShutdownTestHelper = PekkoRpcServiceTest.startStopNCountingAsynchronousOnStopEndpoints(pekkoRpcService, 5);
            for (CompletableFuture<Void> onStopFuture : rpcServiceShutdownTestHelper.getStopFutures()) {
                onStopFuture.complete(null);
            }
            rpcServiceShutdownTestHelper.waitForRpcServiceTermination();
            Assertions.assertThat((boolean)pekkoRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{pekkoRpcService});
            throw throwable;
        }
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{pekkoRpcService});
    }

    @Test
    void testRpcServiceShutDownWithFailingRpcEndpoints() throws Exception {
        PekkoRpcService pekkoRpcService = this.startRpcService();
        int numberActors = 5;
        RpcServiceShutdownTestHelper rpcServiceShutdownTestHelper = PekkoRpcServiceTest.startStopNCountingAsynchronousOnStopEndpoints(pekkoRpcService, 5);
        Iterator<CompletableFuture<Void>> iterator = rpcServiceShutdownTestHelper.getStopFutures().iterator();
        for (int i = 0; i < 4; ++i) {
            iterator.next().complete(null);
        }
        iterator.next().completeExceptionally((Throwable)((Object)new OnStopException("onStop exception occurred.")));
        Assertions.assertThatThrownBy(rpcServiceShutdownTestHelper::waitForRpcServiceTermination).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(OnStopException.class)});
        Assertions.assertThat((boolean)pekkoRpcService.getActorSystem().whenTerminated().isCompleted()).isTrue();
    }

    @Test
    void failsRpcResultImmediatelyIfEndpointIsStopped() throws Exception {
        try (PekkoRpcActorTest.SerializedValueRespondingEndpoint endpoint = new PekkoRpcActorTest.SerializedValueRespondingEndpoint((RpcService)pekkoRpcService);){
            endpoint.start();
            PekkoRpcActorTest.SerializedValueRespondingGateway gateway = (PekkoRpcActorTest.SerializedValueRespondingGateway)pekkoRpcService.connect(endpoint.getAddress(), PekkoRpcActorTest.SerializedValueRespondingGateway.class).join();
            endpoint.close();
            Assertions.assertThatThrownBy(() -> gateway.getSerializedValue().join()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RecipientUnreachableException.class)});
        }
    }

    private static RpcServiceShutdownTestHelper startStopNCountingAsynchronousOnStopEndpoints(PekkoRpcService pekkoRpcService, int numberActors) throws InterruptedException {
        ArrayList<CompletableFuture<Void>> onStopFutures = new ArrayList<CompletableFuture<Void>>(numberActors);
        CountDownLatch countDownLatch = new CountDownLatch(numberActors);
        for (int i = 0; i < numberActors; ++i) {
            CompletableFuture<Void> onStopFuture = new CompletableFuture<Void>();
            CountingAsynchronousOnStopEndpoint endpoint = new CountingAsynchronousOnStopEndpoint((RpcService)pekkoRpcService, onStopFuture, countDownLatch);
            endpoint.start();
            onStopFutures.add(onStopFuture);
        }
        CompletableFuture terminationFuture = pekkoRpcService.closeAsync();
        countDownLatch.await();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        Assertions.assertThat((boolean)pekkoRpcService.getActorSystem().whenTerminated().isCompleted()).isFalse();
        return new RpcServiceShutdownTestHelper(Collections.unmodifiableCollection(onStopFutures), terminationFuture);
    }

    @Nonnull
    private PekkoRpcService startRpcService() {
        ActorSystem actorSystem = PekkoUtils.createDefaultActorSystem();
        return new PekkoRpcService(actorSystem, PekkoRpcServiceConfiguration.defaultConfiguration());
    }

    private static class RpcServiceShutdownTestHelper {
        private final Collection<CompletableFuture<Void>> stopFutures;
        private final CompletableFuture<Void> terminationFuture;

        public RpcServiceShutdownTestHelper(Collection<CompletableFuture<Void>> stopFutures, CompletableFuture<Void> terminationFuture) {
            this.stopFutures = stopFutures;
            this.terminationFuture = terminationFuture;
        }

        public Collection<CompletableFuture<Void>> getStopFutures() {
            return this.stopFutures;
        }

        public void waitForRpcServiceTermination() throws ExecutionException, InterruptedException {
            this.terminationFuture.get();
        }
    }

    private static class OnStopException
    extends FlinkException {
        private static final long serialVersionUID = 7136609202083168954L;

        public OnStopException(String message) {
            super(message);
        }
    }

    private static class CountingAsynchronousOnStopEndpoint
    extends PekkoRpcActorTest.AsynchronousOnStopEndpoint {
        private final CountDownLatch countDownLatch;

        protected CountingAsynchronousOnStopEndpoint(RpcService rpcService, CompletableFuture<Void> onStopFuture, CountDownLatch countDownLatch) {
            super(rpcService, onStopFuture);
            this.countDownLatch = countDownLatch;
        }

        @Override
        public CompletableFuture<Void> onStop() {
            this.countDownLatch.countDown();
            return super.onStop();
        }
    }
}

