/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.PekkoException;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorContext;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ChildRestartStats;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Status;
import org.apache.pekko.actor.SupervisorStrategy;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.PartialFunction;
import scala.collection.Iterable;

class SupervisorActor
extends AbstractActor {
    private static final Logger LOG = LoggerFactory.getLogger(SupervisorActor.class);
    private final Executor terminationFutureExecutor;
    private final Map<ActorRef, RpcActorRegistration> registeredRpcActors;

    SupervisorActor(Executor terminationFutureExecutor) {
        this.terminationFutureExecutor = terminationFutureExecutor;
        this.registeredRpcActors = new HashMap<ActorRef, RpcActorRegistration>();
    }

    @Override
    public AbstractActor.Receive createReceive() {
        return this.receiveBuilder().match(StartRpcActor.class, this::createStartRpcActorMessage).matchAny(this::handleUnknownMessage).build();
    }

    @Override
    public void postStop() throws Exception {
        LOG.debug("Stopping supervisor actor.");
        super.postStop();
        for (RpcActorRegistration actorRegistration : this.registeredRpcActors.values()) {
            this.terminateRpcActorOnStop(actorRegistration);
        }
        this.registeredRpcActors.clear();
    }

    @Override
    public SupervisorActorSupervisorStrategy supervisorStrategy() {
        return new SupervisorActorSupervisorStrategy();
    }

    private void terminateRpcActorOnStop(RpcActorRegistration rpcActorRegistration) {
        rpcActorRegistration.terminateExceptionally((Throwable)new RpcException(String.format("Unexpected closing of %s with name %s.", this.getClass().getSimpleName(), rpcActorRegistration.getEndpointId())), this.terminationFutureExecutor);
    }

    private void createStartRpcActorMessage(StartRpcActor startRpcActor) {
        String endpointId = startRpcActor.getEndpointId();
        RpcActorRegistration rpcActorRegistration = new RpcActorRegistration(endpointId);
        Props rpcActorProps = startRpcActor.getPropsFactory().create(rpcActorRegistration.getInternalTerminationFuture());
        LOG.debug("Starting {} with name {}.", (Object)rpcActorProps.actorClass().getSimpleName(), (Object)endpointId);
        try {
            ActorRef actorRef2 = this.getContext().actorOf(rpcActorProps, endpointId);
            this.registeredRpcActors.put(actorRef2, rpcActorRegistration);
            this.getSender().tell(StartRpcActorResponse.success(ActorRegistration.create(actorRef2, rpcActorRegistration.getExternalTerminationFuture())), this.getSelf());
        }
        catch (PekkoException e) {
            this.getSender().tell(StartRpcActorResponse.failure(e), this.getSelf());
        }
    }

    private void rpcActorTerminated(ActorRef actorRef2) {
        RpcActorRegistration actorRegistration = this.removeAkkaRpcActor(actorRef2);
        LOG.debug("RpcActor {} has terminated.", (Object)actorRef2.path());
        actorRegistration.terminate(this.terminationFutureExecutor);
    }

    private void rpcActorFailed(ActorRef actorRef2, Throwable cause) {
        LOG.warn("RpcActor {} has failed. Shutting it down now.", (Object)actorRef2.path(), (Object)cause);
        for (Map.Entry<ActorRef, RpcActorRegistration> registeredRpcActor : this.registeredRpcActors.entrySet()) {
            ActorRef otherActorRef = registeredRpcActor.getKey();
            if (otherActorRef.equals(actorRef2)) {
                RpcException error = new RpcException(String.format("Stopping actor %s because it failed.", actorRef2.path()), cause);
                registeredRpcActor.getValue().markFailed((Throwable)error);
                continue;
            }
            RpcException siblingException = new RpcException(String.format("Stopping actor %s because its sibling %s has failed.", otherActorRef.path(), actorRef2.path()));
            registeredRpcActor.getValue().markFailed((Throwable)siblingException);
        }
        this.getContext().getSystem().terminate();
    }

    private RpcActorRegistration removeAkkaRpcActor(ActorRef actorRef2) {
        return Optional.ofNullable(this.registeredRpcActors.remove(actorRef2)).orElseThrow(() -> new IllegalStateException(String.format("Could not find actor %s.", actorRef2.path())));
    }

    private void handleUnknownMessage(Object msg) {
        UnknownMessageException cause = new UnknownMessageException(String.format("Cannot handle unknown message %s.", msg));
        this.getSender().tell(new Status.Failure((Throwable)((Object)cause)), this.getSelf());
        throw cause;
    }

    public static String getActorName() {
        return "rpc";
    }

    public static ActorRef startSupervisorActor(ActorSystem actorSystem, Executor terminationFutureExecutor) {
        Props supervisorProps = Props.create(SupervisorActor.class, terminationFutureExecutor).withDispatcher("pekko.actor.supervisor-dispatcher");
        return actorSystem.actorOf(supervisorProps, SupervisorActor.getActorName());
    }

    public static StartRpcActorResponse startRpcActor(ActorRef supervisor, StartRpcActor.PropsFactory propsFactory, String endpointId) {
        return (StartRpcActorResponse)((CompletableFuture)Patterns.ask(supervisor, (Object)SupervisorActor.createStartRpcActorMessage(propsFactory, endpointId), RpcUtils.INF_DURATION).toCompletableFuture().thenApply(StartRpcActorResponse.class::cast)).join();
    }

    public static StartRpcActor createStartRpcActorMessage(StartRpcActor.PropsFactory propsFactory, String endpointId) {
        return StartRpcActor.create(propsFactory, endpointId);
    }

    static final class StartRpcActor {
        private final PropsFactory propsFactory;
        private final String endpointId;

        private StartRpcActor(PropsFactory propsFactory, String endpointId) {
            this.propsFactory = propsFactory;
            this.endpointId = endpointId;
        }

        public String getEndpointId() {
            return this.endpointId;
        }

        public PropsFactory getPropsFactory() {
            return this.propsFactory;
        }

        private static StartRpcActor create(PropsFactory propsFactory, String endpointId) {
            return new StartRpcActor(propsFactory, endpointId);
        }

        static interface PropsFactory {
            public Props create(CompletableFuture<Void> var1);
        }
    }

    private static final class RpcActorRegistration {
        private final String endpointId;
        private final CompletableFuture<Void> internalTerminationFuture;
        private final CompletableFuture<Void> externalTerminationFuture;
        @Nullable
        private Throwable errorCause;

        private RpcActorRegistration(String endpointId) {
            this.endpointId = endpointId;
            this.internalTerminationFuture = new CompletableFuture();
            this.externalTerminationFuture = new CompletableFuture();
            this.errorCause = null;
        }

        private CompletableFuture<Void> getInternalTerminationFuture() {
            return this.internalTerminationFuture;
        }

        private CompletableFuture<Void> getExternalTerminationFuture() {
            return this.externalTerminationFuture;
        }

        private String getEndpointId() {
            return this.endpointId;
        }

        private void terminate(Executor terminationFutureExecutor) {
            CompletionStage<Void> terminationFuture = this.internalTerminationFuture;
            if (this.errorCause != null) {
                if (!this.internalTerminationFuture.completeExceptionally(this.errorCause)) {
                    terminationFuture = this.internalTerminationFuture.handle((ignored, throwable) -> {
                        if (throwable != null) {
                            this.errorCause.addSuppressed((Throwable)throwable);
                        }
                        throw new CompletionException(this.errorCause);
                    });
                }
            } else {
                this.internalTerminationFuture.completeExceptionally((Throwable)new RpcException(String.format("RpcEndpoint %s did not complete the internal termination future.", this.endpointId)));
            }
            FutureUtils.forwardAsync(terminationFuture, this.externalTerminationFuture, (Executor)terminationFutureExecutor);
        }

        private void terminateExceptionally(Throwable cause, Executor terminationFutureExecutor) {
            terminationFutureExecutor.execute(() -> this.externalTerminationFuture.completeExceptionally(cause));
        }

        public void markFailed(Throwable cause) {
            if (this.errorCause == null) {
                this.errorCause = cause;
            } else {
                this.errorCause.addSuppressed(cause);
            }
        }
    }

    private final class SupervisorActorSupervisorStrategy
    extends SupervisorStrategy {
        private SupervisorActorSupervisorStrategy() {
        }

        @Override
        public PartialFunction<Throwable, SupervisorStrategy.Directive> decider() {
            return DeciderBuilder.match(Exception.class, e -> SupervisorStrategy.stop()).build();
        }

        @Override
        public boolean loggingEnabled() {
            return false;
        }

        @Override
        public void handleChildTerminated(ActorContext context, ActorRef child, Iterable<ActorRef> children) {
            SupervisorActor.this.rpcActorTerminated(child);
        }

        @Override
        public void processFailure(ActorContext context, boolean restart, ActorRef child, Throwable cause, ChildRestartStats stats, Iterable<ChildRestartStats> children) {
            Preconditions.checkArgument((!restart ? 1 : 0) != 0, (Object)"The supervisor strategy should never restart an actor.");
            SupervisorActor.this.rpcActorFailed(child, cause);
        }
    }

    static final class ActorRegistration {
        private final ActorRef actorRef;
        private final CompletableFuture<Void> terminationFuture;

        private ActorRegistration(ActorRef actorRef2, CompletableFuture<Void> terminationFuture) {
            this.actorRef = actorRef2;
            this.terminationFuture = terminationFuture;
        }

        public ActorRef getActorRef() {
            return this.actorRef;
        }

        public CompletableFuture<Void> getTerminationFuture() {
            return this.terminationFuture;
        }

        public static ActorRegistration create(ActorRef actorRef2, CompletableFuture<Void> terminationFuture) {
            return new ActorRegistration(actorRef2, terminationFuture);
        }
    }

    static final class StartRpcActorResponse {
        @Nullable
        private final ActorRegistration actorRegistration;
        @Nullable
        private final Throwable error;

        private StartRpcActorResponse(@Nullable ActorRegistration actorRegistration, @Nullable Throwable error) {
            this.actorRegistration = actorRegistration;
            this.error = error;
        }

        public <X extends Throwable> ActorRegistration orElseThrow(Function<? super Throwable, ? extends X> throwableFunction) throws X {
            if (this.actorRegistration != null) {
                return this.actorRegistration;
            }
            throw (Throwable)throwableFunction.apply(this.error);
        }

        public static StartRpcActorResponse success(ActorRegistration actorRegistration) {
            return new StartRpcActorResponse(actorRegistration, null);
        }

        public static StartRpcActorResponse failure(Throwable error) {
            return new StartRpcActorResponse(null, error);
        }
    }
}

