/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.net.BindException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ActorSystemBootstrapToolsTest {
    private static final Logger LOG = LoggerFactory.getLogger(ActorSystemBootstrapToolsTest.class);

    ActorSystemBootstrapToolsTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentActorSystemCreation() throws Exception {
        int concurrentCreations = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
        try {
            List actorSystemFutures = IntStream.range(0, 10).mapToObj(ignored -> CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> {
                cyclicBarrier.await();
                return ActorSystemBootstrapTools.startRemoteActorSystem((Configuration)new Configuration(), (String)"localhost", (String)"0", (Logger)LOG);
            }), executorService)).map(actorSystemFuture -> actorSystemFuture.thenCompose(PekkoUtils::terminateActorSystem)).collect(Collectors.toList());
            FutureUtils.completeAll(actorSystemFutures).get();
        }
        catch (Throwable throwable) {
            ExecutorUtils.gracefulShutdown((long)10000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{executorService});
            throw throwable;
        }
        ExecutorUtils.gracefulShutdown((long)10000L, (TimeUnit)TimeUnit.MILLISECONDS, (ExecutorService[])new ExecutorService[]{executorService});
    }

    @Test
    void testActorSystemInstantiationFailureWhenPortOccupied() throws Exception {
        try (ServerSocket portOccupier = new ServerSocket(0, 10, InetAddress.getByName("0.0.0.0"));){
            int port = portOccupier.getLocalPort();
            Assertions.assertThatThrownBy(() -> ActorSystemBootstrapTools.startRemoteActorSystem((Configuration)new Configuration(), (String)"0.0.0.0", (String)String.valueOf(port), (Logger)LOG)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(BindException.class)});
        }
    }
}

