/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcActorTest;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class RemotePekkoRpcActorTest {
    private static PekkoRpcService rpcService;
    private static PekkoRpcService otherRpcService;
    private static final Configuration configuration;

    RemotePekkoRpcActorTest() {
    }

    @BeforeAll
    static void setupClass() throws Exception {
        rpcService = PekkoRpcServiceUtils.createRemoteRpcService((Configuration)configuration, (String)"localhost", (String)"0", null, Optional.empty());
        otherRpcService = PekkoRpcServiceUtils.createRemoteRpcService((Configuration)configuration, (String)"localhost", (String)"0", null, Optional.empty());
    }

    @AfterAll
    static void teardownClass() throws InterruptedException, ExecutionException, TimeoutException {
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService, otherRpcService});
    }

    @Test
    void canRespondWithNullValueRemotely() throws Exception {
        try (PekkoRpcActorTest.NullRespondingEndpoint nullRespondingEndpoint = new PekkoRpcActorTest.NullRespondingEndpoint((RpcService)rpcService);){
            nullRespondingEndpoint.start();
            PekkoRpcActorTest.NullRespondingGateway rpcGateway = (PekkoRpcActorTest.NullRespondingGateway)otherRpcService.connect(nullRespondingEndpoint.getAddress(), PekkoRpcActorTest.NullRespondingGateway.class).join();
            CompletableFuture<Integer> nullValuedResponseFuture = rpcGateway.foobar();
            Assertions.assertThat((Integer)nullValuedResponseFuture.join()).isNull();
        }
    }

    @Test
    void canRespondWithSynchronousNullValueRemotely() throws Exception {
        try (PekkoRpcActorTest.NullRespondingEndpoint nullRespondingEndpoint = new PekkoRpcActorTest.NullRespondingEndpoint((RpcService)rpcService);){
            nullRespondingEndpoint.start();
            PekkoRpcActorTest.NullRespondingGateway rpcGateway = (PekkoRpcActorTest.NullRespondingGateway)otherRpcService.connect(nullRespondingEndpoint.getAddress(), PekkoRpcActorTest.NullRespondingGateway.class).join();
            Integer value = rpcGateway.synchronousFoobar();
            Assertions.assertThat((Integer)value).isNull();
        }
    }

    @Test
    void canRespondWithSerializedValueRemotely() throws Exception {
        try (PekkoRpcActorTest.SerializedValueRespondingEndpoint endpoint = new PekkoRpcActorTest.SerializedValueRespondingEndpoint((RpcService)rpcService);){
            endpoint.start();
            PekkoRpcActorTest.SerializedValueRespondingGateway remoteGateway = (PekkoRpcActorTest.SerializedValueRespondingGateway)otherRpcService.connect(endpoint.getAddress(), PekkoRpcActorTest.SerializedValueRespondingGateway.class).join();
            Assertions.assertThat(remoteGateway.getSerializedValueSynchronously()).isEqualTo(PekkoRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
            CompletableFuture<SerializedValue<String>> responseFuture = remoteGateway.getSerializedValue();
            Assertions.assertThat(responseFuture.get()).isEqualTo(PekkoRpcActorTest.SerializedValueRespondingEndpoint.SERIALIZED_VALUE);
        }
    }

    @Test
    void failsRpcResultImmediatelyIfEndpointIsStopped() throws Exception {
        try (PekkoRpcActorTest.SerializedValueRespondingEndpoint endpoint = new PekkoRpcActorTest.SerializedValueRespondingEndpoint((RpcService)rpcService);){
            endpoint.start();
            PekkoRpcActorTest.SerializedValueRespondingGateway gateway = (PekkoRpcActorTest.SerializedValueRespondingGateway)otherRpcService.connect(endpoint.getAddress(), PekkoRpcActorTest.SerializedValueRespondingGateway.class).join();
            endpoint.close();
            Assertions.assertThatThrownBy(() -> gateway.getSerializedValue().join()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RecipientUnreachableException.class)});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void failsRpcResultImmediatelyIfRemoteRpcServiceIsNotAvailable() throws Exception {
        PekkoRpcService toBeClosedRpcService = PekkoRpcServiceUtils.createRemoteRpcService((Configuration)configuration, (String)"localhost", (String)"0", null, Optional.empty());
        try (PekkoRpcActorTest.SerializedValueRespondingEndpoint endpoint = new PekkoRpcActorTest.SerializedValueRespondingEndpoint((RpcService)toBeClosedRpcService);){
            endpoint.start();
            PekkoRpcActorTest.SerializedValueRespondingGateway gateway = (PekkoRpcActorTest.SerializedValueRespondingGateway)otherRpcService.connect(endpoint.getAddress(), PekkoRpcActorTest.SerializedValueRespondingGateway.class).join();
            toBeClosedRpcService.closeAsync().join();
            Assertions.assertThatThrownBy(() -> gateway.getSerializedValue().join()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RecipientUnreachableException.class)});
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcService((RpcService[])new RpcService[]{toBeClosedRpcService});
            throw throwable;
        }
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{toBeClosedRpcService});
    }

    static {
        configuration = new Configuration();
    }
}

