/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.ActorSystem;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import scala.concurrent.Future;

class TimeoutCallStackTest {
    private static ActorSystem actorSystem;
    private static RpcService rpcService;
    private final List<RpcEndpoint> endpointsToStop = new ArrayList<RpcEndpoint>();

    TimeoutCallStackTest() {
    }

    @BeforeAll
    static void setup() {
        actorSystem = PekkoUtils.createDefaultActorSystem();
        rpcService = new PekkoRpcService(actorSystem, PekkoRpcServiceConfiguration.defaultConfiguration());
    }

    @AfterAll
    static void teardown() throws Exception {
        CompletableFuture rpcTerminationFuture = rpcService.closeAsync();
        CompletableFuture actorSystemTerminationFuture = ScalaFutureUtils.toJava((Future)actorSystem.terminate());
        FutureUtils.waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)).get(10000L, TimeUnit.MILLISECONDS);
    }

    @AfterEach
    void stopTestEndpoints() {
        this.endpointsToStop.forEach(IOUtils::closeQuietly);
    }

    @Test
    void testTimeoutExceptionWithTime() throws Exception {
        this.testTimeoutException(gateway -> gateway.callThatTimesOut(Duration.ofMillis(1L)));
    }

    @Test
    void testTimeoutExceptionWithDuration() throws Exception {
        this.testTimeoutException(gateway -> gateway.callThatTimesOut(Duration.ofMillis(1L)));
    }

    private void testTimeoutException(Function<TestingGateway, CompletableFuture<Void>> timeoutOperation) throws Exception {
        TestingGateway gateway = this.createTestingGateway();
        CompletableFuture<Void> future = timeoutOperation.apply(gateway);
        Assertions.assertThatThrownBy(future::get).hasCauseInstanceOf(TimeoutException.class).hasStackTraceContaining("testTimeoutException").extracting(Throwable::getCause).extracting(Throwable::getMessage).satisfies(new ThrowingConsumer[]{s -> Assertions.assertThat((String)s).contains(new CharSequence[]{"callThatTimesOut"})});
    }

    private TestingGateway createTestingGateway() throws Exception {
        TestingRpcEndpoint endpoint = new TestingRpcEndpoint(rpcService, "test_name");
        this.endpointsToStop.add(endpoint);
        endpoint.start();
        return (TestingGateway)rpcService.connect(endpoint.getAddress(), TestingGateway.class).get();
    }

    private static interface TestingGateway
    extends RpcGateway {
        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration var1);
    }

    private static final class TestingRpcEndpoint
    extends RpcEndpoint
    implements TestingGateway {
        TestingRpcEndpoint(RpcService rpcService, String endpointId) {
            super(rpcService, endpointId);
        }

        @Override
        public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout) {
            return new CompletableFuture<Void>();
        }
    }
}

