/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import org.apache.flink.runtime.rpc.pekko.ActorSystemExtension;
import org.apache.flink.runtime.rpc.pekko.SupervisorActor;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class SupervisorActorTest {
    @RegisterExtension
    private final ActorSystemExtension actorSystemExtension = ActorSystemExtension.defaultConfiguration();

    SupervisorActorTest() {
    }

    @Test
    void completesTerminationFutureIfActorStops() {
        ActorSystem actorSystem = this.actorSystemExtension.getActorSystem();
        ActorRef supervisor = SupervisorActor.startSupervisorActor((ActorSystem)actorSystem, (Executor)actorSystem.getDispatcher());
        SupervisorActor.ActorRegistration actorRegistration = this.startRpcActor(supervisor, "foobar");
        CompletableFuture terminationFuture = actorRegistration.getTerminationFuture();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        actorRegistration.getActorRef().tell((Object)TerminateWithFutureCompletion.normal(), ActorRef.noSender());
        terminationFuture.join();
    }

    @Test
    void completesTerminationFutureExceptionallyIfActorStopsExceptionally() throws Exception {
        ActorSystem actorSystem = this.actorSystemExtension.getActorSystem();
        ActorRef supervisor = SupervisorActor.startSupervisorActor((ActorSystem)actorSystem, (Executor)actorSystem.getDispatcher());
        SupervisorActor.ActorRegistration actorRegistration = this.startRpcActor(supervisor, "foobar");
        CompletableFuture terminationFuture = actorRegistration.getTerminationFuture();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        FlinkException cause = new FlinkException("Test cause.");
        actorRegistration.getActorRef().tell((Object)TerminateWithFutureCompletion.exceptionally((Throwable)cause), ActorRef.noSender());
        try {
            terminationFuture.get();
            Assertions.fail((String)"Expected the termination future being completed exceptionally");
        }
        catch (ExecutionException expected) {
            ExceptionUtils.findThrowable((Throwable)expected, e -> e.equals(cause)).orElseThrow(() -> new FlinkException("Unexpected exception", (Throwable)expected));
        }
    }

    @Test
    void completesTerminationFutureExceptionallyIfActorStopsWithoutReason() throws InterruptedException {
        ActorSystem actorSystem = this.actorSystemExtension.getActorSystem();
        ActorRef supervisor = SupervisorActor.startSupervisorActor((ActorSystem)actorSystem, (Executor)actorSystem.getDispatcher());
        SupervisorActor.ActorRegistration actorRegistration = this.startRpcActor(supervisor, "foobar");
        CompletableFuture terminationFuture = actorRegistration.getTerminationFuture();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        actorRegistration.getActorRef().tell((Object)Terminate.INSTANCE, ActorRef.noSender());
        try {
            terminationFuture.get();
            Assertions.fail((String)"Expected the termination future being completed exceptionally");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }

    @Test
    void completesTerminationFutureExceptionallyIfActorFails() throws Exception {
        ActorSystem actorSystem = this.actorSystemExtension.getActorSystem();
        ActorRef supervisor = SupervisorActor.startSupervisorActor((ActorSystem)actorSystem, (Executor)actorSystem.getDispatcher());
        SupervisorActor.ActorRegistration actorRegistration = this.startRpcActor(supervisor, "foobar");
        CompletableFuture terminationFuture = actorRegistration.getTerminationFuture();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        CompletableFuture actorSystemTerminationFuture = actorSystem.getWhenTerminated().toCompletableFuture();
        FlinkException cause = new FlinkException("Test cause.");
        actorRegistration.getActorRef().tell((Object)Fail.exceptionally((Throwable)cause), ActorRef.noSender());
        try {
            terminationFuture.get();
            Assertions.fail((String)"Expected the termination future being completed exceptionally");
        }
        catch (ExecutionException expected) {
            ExceptionUtils.findThrowable((Throwable)expected, e -> e.equals(cause)).orElseThrow(() -> new FlinkException("Unexpected exception", (Throwable)expected));
        }
        actorSystemTerminationFuture.join();
    }

    @Test
    void completesTerminationFutureOfSiblingsIfActorFails() throws Exception {
        ActorSystem actorSystem = this.actorSystemExtension.getActorSystem();
        ActorRef supervisor = SupervisorActor.startSupervisorActor((ActorSystem)actorSystem, (Executor)actorSystem.getDispatcher());
        SupervisorActor.ActorRegistration actorRegistration1 = this.startRpcActor(supervisor, "foobar1");
        SupervisorActor.ActorRegistration actorRegistration2 = this.startRpcActor(supervisor, "foobar2");
        CompletableFuture terminationFuture = actorRegistration2.getTerminationFuture();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        FlinkException cause = new FlinkException("Test cause.");
        actorRegistration1.getActorRef().tell((Object)Fail.exceptionally((Throwable)cause), ActorRef.noSender());
        try {
            terminationFuture.get();
            Assertions.fail((String)"Expected the termination future being completed exceptionally");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
    }

    private SupervisorActor.ActorRegistration startRpcActor(ActorRef supervisor, String endpointId) {
        SupervisorActor.StartRpcActorResponse startResponse = SupervisorActor.startRpcActor((ActorRef)supervisor, terminationFuture -> Props.create(SimpleActor.class, (Object[])new Object[]{terminationFuture}), (String)endpointId);
        return startResponse.orElseThrow(cause -> new AssertionError("Expected the start to succeed.", (Throwable)cause));
    }

    private static final class TerminateWithFutureCompletion {
        @Nullable
        private final Throwable terminationError;

        private TerminateWithFutureCompletion(@Nullable Throwable terminationError) {
            this.terminationError = terminationError;
        }

        @Nullable
        private Throwable getTerminationError() {
            return this.terminationError;
        }

        private static TerminateWithFutureCompletion normal() {
            return new TerminateWithFutureCompletion(null);
        }

        private static TerminateWithFutureCompletion exceptionally(Throwable cause) {
            return new TerminateWithFutureCompletion(cause);
        }
    }

    private static final class Terminate {
        private static final Terminate INSTANCE = new Terminate();

        private Terminate() {
        }
    }

    private static final class Fail {
        private final Throwable cause;

        private Fail(Throwable cause) {
            this.cause = cause;
        }

        private Throwable getCause() {
            return this.cause;
        }

        private static Fail exceptionally(Throwable cause) {
            return new Fail(cause);
        }
    }

    private static final class SimpleActor
    extends AbstractActor {
        private final CompletableFuture<Void> terminationFuture;

        private SimpleActor(CompletableFuture<Void> terminationFuture) {
            this.terminationFuture = terminationFuture;
        }

        public AbstractActor.Receive createReceive() {
            return ReceiveBuilder.create().match(Terminate.class, this::terminate).match(TerminateWithFutureCompletion.class, this::terminateActorWithFutureCompletion).match(Fail.class, this::fail).build();
        }

        private void fail(Fail fail) {
            throw new RuntimeException(fail.getCause());
        }

        private void terminate(Terminate terminate) {
            this.terminateActor();
        }

        private void terminateActor() {
            this.getContext().stop(this.getSelf());
        }

        private void terminateActorWithFutureCompletion(TerminateWithFutureCompletion terminateWithFutureCompletion) {
            Throwable terminationError = terminateWithFutureCompletion.getTerminationError();
            if (terminationError == null) {
                this.terminationFuture.complete(null);
            } else {
                this.terminationFuture.completeExceptionally(terminationError);
            }
            this.terminateActor();
        }
    }
}

