/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.AbstractObjectAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class PekkoRpcActorOversizedResponseMessageTest {
    private static final int FRAMESIZE = 32000;
    private static final String OVERSIZED_PAYLOAD = new String(new byte[32000]);
    private static final String PAYLOAD = "Hello";
    private static RpcService rpcService1;
    private static RpcService rpcService2;

    PekkoRpcActorOversizedResponseMessageTest() {
    }

    @BeforeAll
    static void setupClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(RpcOptions.FORCE_RPC_INVOCATION_SERIALIZATION, (Object)false);
        configuration.set(RpcOptions.FRAMESIZE, (Object)"32000 b");
        rpcService1 = PekkoRpcServiceUtils.remoteServiceBuilder((Configuration)configuration, (String)"localhost", (int)0).createAndStart();
        rpcService2 = PekkoRpcServiceUtils.remoteServiceBuilder((Configuration)configuration, (String)"localhost", (int)0).createAndStart();
    }

    @AfterAll
    static void teardownClass() throws Exception {
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService1, rpcService2});
    }

    @Test
    void testOverSizedResponseMsgAsync() throws Exception {
        ((AbstractObjectAssert)Assertions.assertThatThrownBy(() -> this.runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync)).hasCauseInstanceOf(RpcException.class).extracting(ExceptionUtils::stripExecutionException).isInstanceOf(RpcException.class)).extracting(Throwable::getMessage).satisfies(new ThrowingConsumer[]{message -> Assertions.assertThat((String)message).contains(new CharSequence[]{String.valueOf(32000)})});
    }

    @Test
    void testNormalSizedResponseMsgAsync() throws Exception {
        String message = (String)this.runRemoteMessageResponseTest(PAYLOAD, this::requestMessageAsync);
        Assertions.assertThat((String)message).isEqualTo(PAYLOAD);
    }

    @Test
    void testNormalSizedResponseMsgSync() throws Exception {
        String message = (String)this.runRemoteMessageResponseTest(PAYLOAD, MessageRpcGateway::messageSync);
        Assertions.assertThat((String)message).isEqualTo(PAYLOAD);
    }

    @Test
    void testOverSizedResponseMsgSync() throws Exception {
        Assertions.assertThatThrownBy(() -> this.runRemoteMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RpcException.class, (String)String.valueOf(32000))});
    }

    @Test
    void testLocalOverSizedResponseMsgSync() throws Exception {
        String message = (String)this.runLocalMessageResponseTest(OVERSIZED_PAYLOAD, MessageRpcGateway::messageSync);
        Assertions.assertThat((String)message).isEqualTo(OVERSIZED_PAYLOAD);
    }

    @Test
    void testLocalOverSizedResponseMsgAsync() throws Exception {
        String message = (String)this.runLocalMessageResponseTest(OVERSIZED_PAYLOAD, this::requestMessageAsync);
        Assertions.assertThat((String)message).isEqualTo(OVERSIZED_PAYLOAD);
    }

    private String requestMessageAsync(MessageRpcGateway messageRpcGateway) throws Exception {
        CompletableFuture<String> messageFuture = messageRpcGateway.messageAsync();
        return messageFuture.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T runRemoteMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
        Object object;
        MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
        try {
            rpcEndpoint.start();
            MessageRpcGateway rpcGateway = (MessageRpcGateway)rpcService2.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
            object = rpcCall.apply((Object)rpcGateway);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
        return (T)object;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> T runLocalMessageResponseTest(String payload, FunctionWithException<MessageRpcGateway, T, Exception> rpcCall) throws Exception {
        Object object;
        MessageRpcEndpoint rpcEndpoint = new MessageRpcEndpoint(rpcService1, payload);
        try {
            rpcEndpoint.start();
            MessageRpcGateway rpcGateway = (MessageRpcGateway)rpcService1.connect(rpcEndpoint.getAddress(), MessageRpcGateway.class).get();
            object = rpcCall.apply((Object)rpcGateway);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{rpcEndpoint});
        return (T)object;
    }

    static interface MessageRpcGateway
    extends RpcGateway {
        public CompletableFuture<String> messageAsync();

        public String messageSync() throws RpcException;
    }

    static class MessageRpcEndpoint
    extends RpcEndpoint
    implements MessageRpcGateway {
        @Nonnull
        private final String message;

        MessageRpcEndpoint(RpcService rpcService, @Nonnull String message) {
            super(rpcService);
            this.message = message;
        }

        @Override
        public CompletableFuture<String> messageAsync() {
            return CompletableFuture.completedFuture(this.message);
        }

        @Override
        public String messageSync() throws RpcException {
            return this.message;
        }
    }
}

