/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.CheckpointSchedulingProvider;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.FailureResultUtil;
import org.apache.flink.runtime.scheduler.adaptive.ResourceListener;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitions;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

class Executing
extends StateWithExecutionGraph
implements ResourceListener,
StateTransitionManager.Context,
CheckpointStatsListener {
    private final Context context;
    private final StateTransitionManager stateTransitionManager;
    private final int rescaleOnFailedCheckpointCount;
    @Nullable
    private AtomicInteger failedCheckpointCountdown;

    Executing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger logger, Context context, ClassLoader userCodeClassLoader, List<ExceptionHistoryEntry> failureCollection, Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory, int rescaleOnFailedCheckpointCount) {
        super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger, userCodeClassLoader, failureCollection);
        this.context = context;
        Preconditions.checkState((executionGraph.getState() == JobStatus.RUNNING ? 1 : 0) != 0, (Object)"Assuming running execution graph");
        this.stateTransitionManager = stateTransitionManagerFactory.apply(this);
        Preconditions.checkArgument((rescaleOnFailedCheckpointCount > 0 ? 1 : 0) != 0, (Object)"The rescaleOnFailedCheckpointCount should be larger than 0.");
        this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
        this.failedCheckpointCountdown = null;
        this.deploy();
        context.runIfState(this, () -> {
            this.stateTransitionManager.onChange();
            this.stateTransitionManager.onTrigger();
        }, Duration.ZERO);
    }

    @Override
    public boolean hasSufficientResources() {
        return this.parallelismChanged() && this.context.hasSufficientResources();
    }

    @Override
    public boolean hasDesiredResources() {
        return this.parallelismChanged() && this.context.hasDesiredResources();
    }

    private boolean parallelismChanged() {
        VertexParallelism currentParallelism = Executing.extractCurrentVertexParallelism(this.getExecutionGraph());
        return this.context.getAvailableVertexParallelism().map(availableParallelism -> availableParallelism.getVertices().stream().anyMatch(vertex -> currentParallelism.getParallelism((JobVertexID)vertex) != availableParallelism.getParallelism((JobVertexID)vertex))).orElse(false);
    }

    private static VertexParallelism extractCurrentVertexParallelism(AccessExecutionGraph executionGraph) {
        return new VertexParallelism(executionGraph.getAllVertices().values().stream().collect(Collectors.toMap(AccessExecutionJobVertex::getJobVertexId, AccessExecutionJobVertex::getParallelism)));
    }

    @Override
    public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration delay) {
        return this.context.runIfState(this, callback, delay);
    }

    @Override
    public void transitionToSubsequentState() {
        this.context.goToRestarting(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), Duration.ofMillis(0L), this.context.getAvailableVertexParallelism().orElseThrow(() -> new IllegalStateException("Resources must be available when rescaling.")), this.getFailures());
    }

    @Override
    public JobStatus getJobStatus() {
        return JobStatus.RUNNING;
    }

    @Override
    public void cancel() {
        this.context.goToCanceling(this.getExecutionGraph(), this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), this.getFailures());
    }

    @Override
    void onFailure(Throwable cause, CompletableFuture<Map<String, String>> failureLabels) {
        FailureResultUtil.restartOrFail(this.context.howToHandleFailure(cause, failureLabels), this.context, this);
    }

    @Override
    void onGloballyTerminalState(JobStatus globallyTerminalState) {
        this.context.goToFinished(ArchivedExecutionGraph.createFrom(this.getExecutionGraph()));
    }

    @Override
    public void onLeave(Class<? extends State> newState) {
        this.stateTransitionManager.close();
        super.onLeave(newState);
    }

    private void deploy() {
        for (ExecutionJobVertex executionJobVertex : this.getExecutionGraph().getVerticesTopologically()) {
            for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                if (executionVertex.getExecutionState() != ExecutionState.CREATED && executionVertex.getExecutionState() != ExecutionState.SCHEDULED) continue;
                this.deploySafely(executionVertex);
            }
        }
    }

    private void deploySafely(ExecutionVertex executionVertex) {
        try {
            executionVertex.deploy();
        }
        catch (JobException e) {
            this.handleDeploymentFailure(executionVertex, e);
        }
    }

    private void handleDeploymentFailure(ExecutionVertex executionVertex, JobException e) {
        executionVertex.markFailed((Throwable)((Object)e));
    }

    @Override
    public void onNewResourcesAvailable() {
        this.stateTransitionManager.onChange();
        this.initializeFailedCheckpointCountdownIfUnset();
    }

    @Override
    public void onNewResourceRequirements() {
        this.stateTransitionManager.onChange();
        this.initializeFailedCheckpointCountdownIfUnset();
    }

    @Override
    public void onCompletedCheckpoint() {
        this.triggerPotentialRescale();
    }

    @Override
    public void onFailedCheckpoint() {
        if (this.failedCheckpointCountdown != null && this.failedCheckpointCountdown.decrementAndGet() <= 0) {
            this.triggerPotentialRescale();
        }
    }

    private void triggerPotentialRescale() {
        this.stateTransitionManager.onTrigger();
        this.failedCheckpointCountdown = null;
    }

    private void initializeFailedCheckpointCountdownIfUnset() {
        if (this.failedCheckpointCountdown == null) {
            this.failedCheckpointCountdown = new AtomicInteger(this.rescaleOnFailedCheckpointCount);
        }
    }

    CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean terminate, SavepointFormatType formatType) {
        ExecutionGraph executionGraph = this.getExecutionGraph();
        StopWithSavepointTerminationManager.checkSavepointActionPreconditions(executionGraph.getCheckpointCoordinator(), targetDirectory, executionGraph.getJobID(), this.getLogger());
        this.getLogger().info("Triggering stop-with-savepoint for job {}.", (Object)executionGraph.getJobID());
        CheckpointSchedulingProvider schedulingProvider = new CheckpointSchedulingProvider(executionGraph);
        schedulingProvider.stopCheckpointScheduler();
        CompletionStage savepointFuture = Objects.requireNonNull(executionGraph.getCheckpointCoordinator()).triggerSynchronousSavepoint(terminate, targetDirectory, formatType).thenApply(CompletedCheckpoint::getExternalPointer);
        return this.context.goToStopWithSavepoint(executionGraph, this.getExecutionGraphHandler(), this.getOperatorCoordinatorHandler(), schedulingProvider, (CompletableFuture<String>)savepointFuture, this.getFailures());
    }

    static interface Context
    extends StateWithExecutionGraph.Context,
    StateTransitions.ToCancelling,
    StateTransitions.ToFailing,
    StateTransitions.ToRestarting,
    StateTransitions.ToStopWithSavepoint {
        public FailureResult howToHandleFailure(Throwable var1, CompletableFuture<Map<String, String>> var2);

        public Optional<VertexParallelism> getAvailableVertexParallelism();

        public ScheduledFuture<?> runIfState(State var1, Runnable var2, Duration var3);

        public boolean hasDesiredResources();

        public boolean hasSufficientResources();
    }

    static class Factory
    implements StateFactory<Executing> {
        private final Context context;
        private final Logger log;
        private final ExecutionGraph executionGraph;
        private final ExecutionGraphHandler executionGraphHandler;
        private final OperatorCoordinatorHandler operatorCoordinatorHandler;
        private final ClassLoader userCodeClassLoader;
        private final List<ExceptionHistoryEntry> failureCollection;
        private final Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory;
        private final int rescaleOnFailedCheckpointCount;

        Factory(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Logger log, Context context, ClassLoader userCodeClassLoader, List<ExceptionHistoryEntry> failureCollection, Function<StateTransitionManager.Context, StateTransitionManager> stateTransitionManagerFactory, int rescaleOnFailedCheckpointCount) {
            this.context = context;
            this.log = log;
            this.executionGraph = executionGraph;
            this.executionGraphHandler = executionGraphHandler;
            this.operatorCoordinatorHandler = operatorCoordinatorHandler;
            this.userCodeClassLoader = userCodeClassLoader;
            this.failureCollection = failureCollection;
            this.stateTransitionManagerFactory = stateTransitionManagerFactory;
            this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
        }

        @Override
        public Class<Executing> getStateClass() {
            return Executing.class;
        }

        @Override
        public Executing getState() {
            return new Executing(this.executionGraph, this.executionGraphHandler, this.operatorCoordinatorHandler, this.log, this.context, this.userCodeClassLoader, this.failureCollection, this.stateTransitionManagerFactory, this.rescaleOnFailedCheckpointCount);
        }
    }
}

