/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class PekkoUtilsTest {
    PekkoUtilsTest() {
    }

    @Test
    void getHostFromRpcURLForRemoteRpcURL() throws Exception {
        String host = "127.0.0.1";
        int port = 1234;
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 1234);
        String remoteRpcURL = PekkoRpcServiceUtils.getRpcUrl((String)"127.0.0.1", (int)1234, (String)"actor", (AddressResolution)AddressResolution.NO_ADDRESS_RESOLUTION, (PekkoRpcServiceUtils.Protocol)PekkoRpcServiceUtils.Protocol.TCP);
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)remoteRpcURL);
        Assertions.assertThat((Object)result).isEqualTo((Object)address);
    }

    @Test
    void getHostFromRpcURLThrowsExceptionIfAddressCannotBeRetrieved() throws Exception {
        String localRpcURL = "pekko://flink/user/actor";
        Assertions.assertThatThrownBy(() -> PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko://flink/user/actor")).isInstanceOf(Exception.class);
    }

    @Test
    void getHostFromRpcURLReturnsHostAfterAtSign() throws Exception {
        String url = "pekko.tcp://flink@localhost:1234/user/jobmanager";
        InetSocketAddress expected = new InetSocketAddress("localhost", 1234);
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko.tcp://flink@localhost:1234/user/jobmanager");
        Assertions.assertThat((Object)result).isEqualTo((Object)expected);
    }

    @Test
    void getHostFromRpcURLHandlesAkkaTcpProtocol() throws Exception {
        String url = "pekko.tcp://flink@localhost:1234/user/jobmanager";
        InetSocketAddress expected = new InetSocketAddress("localhost", 1234);
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko.tcp://flink@localhost:1234/user/jobmanager");
        Assertions.assertThat((Object)result).isEqualTo((Object)expected);
    }

    @Test
    void getHostFromRpcURLHandlesAkkaSslTcpProtocol() throws Exception {
        String url = "pekko.ssl.tcp://flink@localhost:1234/user/jobmanager";
        InetSocketAddress expected = new InetSocketAddress("localhost", 1234);
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko.ssl.tcp://flink@localhost:1234/user/jobmanager");
        Assertions.assertThat((Object)result).isEqualTo((Object)expected);
    }

    @Test
    void getHostFromRpcURLHandlesIPv4Addresses() throws Exception {
        String ipv4Address = "192.168.0.1";
        int port = 1234;
        InetSocketAddress address = new InetSocketAddress("192.168.0.1", 1234);
        String url = "pekko://flink@192.168.0.1:1234/user/jobmanager";
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko://flink@192.168.0.1:1234/user/jobmanager");
        Assertions.assertThat((Object)result).isEqualTo((Object)address);
    }

    @Test
    void getHostFromRpcURLHandlesIPv6Addresses() throws Exception {
        String ipv6Address = "2001:db8:10:11:12:ff00:42:8329";
        int port = 1234;
        InetSocketAddress address = new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234);
        String url = "pekko://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager";
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager");
        Assertions.assertThat((Object)result).isEqualTo((Object)address);
    }

    @Test
    void getHostFromRpcURLHandlesIPv6AddressesTcp() throws Exception {
        String ipv6Address = "2001:db8:10:11:12:ff00:42:8329";
        int port = 1234;
        InetSocketAddress address = new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234);
        String url = "pekko.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager";
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager");
        Assertions.assertThat((Object)result).isEqualTo((Object)address);
    }

    @Test
    void getHostFromRpcURLHandlesIPv6AddressesSsl() throws Exception {
        String ipv6Address = "2001:db8:10:11:12:ff00:42:8329";
        int port = 1234;
        InetSocketAddress address = new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234);
        String url = "pekko.ssl.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager";
        InetSocketAddress result = PekkoUtils.getInetSocketAddressFromRpcURL((String)"pekko.ssl.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager");
        Assertions.assertThat((Object)result).isEqualTo((Object)address);
    }

    @Test
    void getConfigNormalizesHostName() {
        Configuration configuration = new Configuration();
        String hostname = "AbC123foOBaR";
        int port = 1234;
        Config config = PekkoUtils.getConfig((Configuration)configuration, (HostAndPort)new HostAndPort("AbC123foOBaR", 1234));
        Assertions.assertThat((String)config.getString("pekko.remote.classic.netty.tcp.hostname")).isEqualTo(NetUtils.unresolvedHostToNormalizedString((String)"AbC123foOBaR"));
    }

    @Test
    void getConfigDefaultsToLocalHost() throws UnknownHostException {
        Config config = PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("", 0));
        String hostname = config.getString("pekko.remote.classic.netty.tcp.hostname");
        Assertions.assertThat((boolean)InetAddress.getByName(hostname).isLoopbackAddress()).isTrue();
    }

    @Test
    void getConfigDefaultsToForkJoinExecutor() {
        Config config = PekkoUtils.getConfig((Configuration)new Configuration(), null);
        Assertions.assertThat((String)config.getString("pekko.actor.default-dispatcher.executor")).isEqualTo("fork-join-executor");
    }

    @Test
    void getConfigDefaultsToRemoteForkJoinExecutor() {
        Config config = PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("localhost", 1234));
        Assertions.assertThat((String)config.getString("pekko.remote.default-remote-dispatcher.executor")).isEqualTo("fork-join-executor");
    }

    @Test
    void getConfigSetsExecutorWithThreadPriority() {
        int threadPriority = 3;
        boolean minThreads = true;
        int maxThreads = 3;
        Config config = PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("localhost", 1234), null, (Config)PekkoUtils.getThreadPoolExecutorConfig((RpcSystem.FixedThreadPoolExecutorConfiguration)new RpcSystem.FixedThreadPoolExecutorConfiguration(1, 3, 3)));
        Assertions.assertThat((String)config.getString("pekko.actor.default-dispatcher.executor")).isEqualTo("thread-pool-executor");
        Assertions.assertThat((int)config.getInt("pekko.actor.default-dispatcher.thread-priority")).isEqualTo(3);
        Assertions.assertThat((int)config.getInt("pekko.actor.default-dispatcher.thread-pool-executor.core-pool-size-min")).isEqualTo(1);
        Assertions.assertThat((int)config.getInt("pekko.actor.default-dispatcher.thread-pool-executor.core-pool-size-max")).isEqualTo(3);
    }

    @Test
    void getConfigHandlesIPv6Address() {
        String ipv6AddressString = "2001:db8:10:11:12:ff00:42:8329";
        Config config = PekkoUtils.getConfig((Configuration)new Configuration(), (HostAndPort)new HostAndPort("2001:db8:10:11:12:ff00:42:8329", 1234));
        Assertions.assertThat((String)config.getString("pekko.remote.classic.netty.tcp.hostname")).isEqualTo(NetUtils.unresolvedHostToNormalizedString((String)"2001:db8:10:11:12:ff00:42:8329"));
    }

    @Test
    void getConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() {
        Configuration configuration = new Configuration();
        configuration.set(RpcOptions.ASK_TIMEOUT_DURATION, (Object)Duration.ofMillis(100L));
        Config config = PekkoUtils.getConfig((Configuration)configuration, (HostAndPort)new HostAndPort("localhost", 31337));
        Assertions.assertThat((String)config.getString("pekko.remote.startup-timeout")).isEqualTo("1000ms");
    }

    @Test
    void getConfigSslEngineProviderWithoutCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.set(SecurityOptions.SSL_INTERNAL_ENABLED, (Object)true);
        Config config = PekkoUtils.getConfig((Configuration)configuration, (HostAndPort)new HostAndPort("localhost", 31337));
        Config sslConfig = config.getConfig("pekko.remote.classic.netty.ssl");
        Assertions.assertThat((String)sslConfig.getString("ssl-engine-provider")).isEqualTo("org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider");
        Assertions.assertThat((List)sslConfig.getStringList("security.cert-fingerprints")).isEmpty();
    }

    @Test
    void getConfigSslEngineProviderWithCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.set(SecurityOptions.SSL_INTERNAL_ENABLED, (Object)true);
        String fingerprint = "A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36";
        configuration.set(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, (Object)"A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36");
        Config config = PekkoUtils.getConfig((Configuration)configuration, (HostAndPort)new HostAndPort("localhost", 31337));
        Config sslConfig = config.getConfig("pekko.remote.classic.netty.ssl");
        Assertions.assertThat((String)sslConfig.getString("ssl-engine-provider")).isEqualTo("org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider");
        Assertions.assertThat((List)sslConfig.getStringList("security.cert-fingerprints")).contains((Object[])new String[]{"A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36"});
    }

    @Test
    void getConfigCustomKeyOrTruststoreType() {
        Configuration configuration = new Configuration();
        configuration.set(SecurityOptions.SSL_INTERNAL_ENABLED, (Object)true);
        configuration.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE, (Object)"JKS");
        configuration.set(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_TYPE, (Object)"JKS");
        Config config = PekkoUtils.getConfig((Configuration)configuration, (HostAndPort)new HostAndPort("localhost", 31337));
        Config securityConfig = config.getConfig("pekko.remote.classic.netty.ssl.security");
        Assertions.assertThat((String)securityConfig.getString("key-store-type")).isEqualTo("JKS");
        Assertions.assertThat((String)securityConfig.getString("trust-store-type")).isEqualTo("JKS");
    }
}

