/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.scheduler.adaptive.StateTransitionManager;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public class DefaultStateTransitionManager
implements StateTransitionManager {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultStateTransitionManager.class);
    private final Supplier<Temporal> clock;
    private final StateTransitionManager.Context transitionContext;
    private Phase phase;
    private final List<ScheduledFuture<?>> scheduledFutures;
    private final Duration resourceStabilizationTimeout;
    private final Duration maxTriggerDelay;

    DefaultStateTransitionManager(StateTransitionManager.Context transitionContext, Supplier<Temporal> clock, Duration cooldownTimeout, Duration resourceStabilizationTimeout, Duration maxTriggerDelay) {
        this.clock = (Supplier)Preconditions.checkNotNull(clock);
        Preconditions.checkArgument((!maxTriggerDelay.isNegative() ? 1 : 0) != 0, (Object)"Max trigger delay must not be negative");
        this.maxTriggerDelay = maxTriggerDelay;
        this.resourceStabilizationTimeout = (Duration)Preconditions.checkNotNull((Object)resourceStabilizationTimeout);
        Preconditions.checkArgument((!resourceStabilizationTimeout.isNegative() ? 1 : 0) != 0, (Object)"Resource stabilization timeout must not be negative");
        this.transitionContext = (StateTransitionManager.Context)Preconditions.checkNotNull((Object)transitionContext);
        this.scheduledFutures = new ArrayList();
        this.phase = new Cooldown((Temporal)Preconditions.checkNotNull((Object)clock.get()), clock, this, (Duration)Preconditions.checkNotNull((Object)cooldownTimeout));
    }

    @Override
    public void onChange() {
        LOG.debug("OnChange event received in phase {} for job {}.", (Object)this.getPhase(), (Object)this.getJobId());
        this.phase.onChange();
    }

    @Override
    public void onTrigger() {
        LOG.debug("OnTrigger event received in phase {} for job {}.", (Object)this.getPhase(), (Object)this.getJobId());
        this.phase.onTrigger();
    }

    @Override
    public void close() {
        this.scheduledFutures.forEach(future -> future.cancel(true));
        this.scheduledFutures.clear();
    }

    @VisibleForTesting
    Phase getPhase() {
        return this.phase;
    }

    private void progressToIdling() {
        this.progressToPhase(new Idling(this.clock, this));
    }

    private void progressToStabilizing(Temporal firstChangeEventTimestamp) {
        this.progressToPhase(new Stabilizing(this.clock, this, this.resourceStabilizationTimeout, firstChangeEventTimestamp, this.maxTriggerDelay));
    }

    private void progressToStabilized(Temporal firstChangeEventTimestamp) {
        this.progressToPhase(new Stabilized(this.clock, this, firstChangeEventTimestamp, this.maxTriggerDelay));
    }

    private void triggerTransitionToSubsequentState() {
        this.progressToPhase(new Transitioning(this.clock, this));
        this.transitionContext.transitionToSubsequentState();
    }

    private void progressToPhase(Phase newPhase) {
        Preconditions.checkState((!(this.phase instanceof Transitioning) ? 1 : 0) != 0, (Object)"The state transition operation has already been triggered.");
        LOG.info("Transitioning from {} to {}, job {}.", new Object[]{this.phase, newPhase, this.getJobId()});
        this.phase = newPhase;
    }

    @VisibleForTesting
    void scheduleFromNow(Runnable callback, Duration delay, Phase phase) {
        this.scheduledFutures.add(this.transitionContext.scheduleOperation(() -> this.runIfPhase(phase, callback), delay));
    }

    private void runIfPhase(Phase expectedPhase, Runnable callback) {
        if (this.getPhase() == expectedPhase) {
            callback.run();
        } else {
            LOG.debug("Ignoring scheduled action because expected phase {} is not the actual phase {}, job {}.", new Object[]{expectedPhase, this.getPhase(), this.getJobId()});
        }
    }

    private JobID getJobId() {
        return this.transitionContext.getJobId();
    }

    @VisibleForTesting
    static final class Cooldown
    extends Phase {
        @Nullable
        private Temporal firstChangeEventTimestamp;

        private Cooldown(Temporal timeOfLastRescale, Supplier<Temporal> clock, DefaultStateTransitionManager context, Duration cooldownTimeout) {
            super(clock, context);
            this.scheduleRelativelyTo(this::finalizeCooldown, timeOfLastRescale, cooldownTimeout);
        }

        @Override
        void onChange() {
            if (this.hasSufficientResources() && this.firstChangeEventTimestamp == null) {
                this.firstChangeEventTimestamp = this.now();
            }
        }

        private void finalizeCooldown() {
            if (this.firstChangeEventTimestamp == null) {
                this.context().progressToIdling();
            } else {
                this.context().progressToStabilizing(this.firstChangeEventTimestamp);
            }
        }
    }

    @VisibleForTesting
    static abstract class Phase {
        private final Supplier<Temporal> clock;
        private final DefaultStateTransitionManager context;

        @VisibleForTesting
        Phase(Supplier<Temporal> clock, DefaultStateTransitionManager context) {
            this.clock = clock;
            this.context = context;
        }

        Temporal now() {
            return this.clock.get();
        }

        DefaultStateTransitionManager context() {
            return this.context;
        }

        void scheduleRelativelyTo(Runnable callback, Temporal startOfTimeout, Duration timeout) {
            Duration passedTimeout = Duration.between(startOfTimeout, this.now());
            Preconditions.checkArgument((!passedTimeout.isNegative() ? 1 : 0) != 0, (String)"The startOfTimeout ({}) should be in the past but is after the current time.", (Object[])new Object[]{startOfTimeout});
            Duration timeoutLeft = timeout.minus(passedTimeout);
            this.scheduleFromNow(callback, timeoutLeft.isNegative() ? Duration.ZERO : timeoutLeft);
        }

        void scheduleFromNow(Runnable callback, Duration delay) {
            this.context.scheduleFromNow(callback, delay, this);
        }

        boolean hasDesiredResources() {
            return this.context.transitionContext.hasDesiredResources();
        }

        boolean hasSufficientResources() {
            return this.context.transitionContext.hasSufficientResources();
        }

        void onChange() {
        }

        void onTrigger() {
        }

        public String toString() {
            return this.getClass().getSimpleName();
        }

        JobID getJobId() {
            return this.context.getJobId();
        }
    }

    @VisibleForTesting
    static final class Idling
    extends Phase {
        private Idling(Supplier<Temporal> clock, DefaultStateTransitionManager context) {
            super(clock, context);
        }

        @Override
        void onChange() {
            if (this.hasSufficientResources()) {
                this.context().progressToStabilizing(this.now());
            }
        }
    }

    static final class Stabilizing
    extends Phase {
        private Temporal onChangeEventTimestamp;
        private final Duration maxTriggerDelay;
        private boolean evaluationScheduled = false;

        private Stabilizing(Supplier<Temporal> clock, DefaultStateTransitionManager context, Duration resourceStabilizationTimeout, Temporal firstOnChangeEventTimestamp, Duration maxTriggerDelay) {
            super(clock, context);
            this.onChangeEventTimestamp = firstOnChangeEventTimestamp;
            this.maxTriggerDelay = maxTriggerDelay;
            this.scheduleRelativelyTo(() -> this.context().progressToStabilized(firstOnChangeEventTimestamp), firstOnChangeEventTimestamp, resourceStabilizationTimeout);
            this.scheduleTransitionEvaluation();
        }

        @Override
        void onChange() {
            this.onChangeEventTimestamp = this.now();
            this.scheduleTransitionEvaluation();
        }

        @Override
        void onTrigger() {
            this.transitionToSubSequentStateForDesiredResources();
        }

        private void scheduleTransitionEvaluation() {
            if (!this.evaluationScheduled) {
                this.evaluationScheduled = true;
                this.scheduleRelativelyTo(() -> {
                    this.evaluationScheduled = false;
                    this.transitionToSubSequentStateForDesiredResources();
                }, this.onChangeEventTimestamp, this.maxTriggerDelay);
            }
        }

        private void transitionToSubSequentStateForDesiredResources() {
            if (this.hasDesiredResources()) {
                LOG.info("Desired resources are met, transitioning to the subsequent state, job {}.", (Object)this.getJobId());
                this.context().triggerTransitionToSubsequentState();
            } else {
                LOG.debug("Desired resources are not met, skipping the transition to the subsequent state, job {}.", (Object)this.getJobId());
            }
        }
    }

    @VisibleForTesting
    static final class Stabilized
    extends Phase {
        private Stabilized(Supplier<Temporal> clock, DefaultStateTransitionManager context, Temporal firstChangeEventTimestamp, Duration maxTriggerDelay) {
            super(clock, context);
            this.scheduleRelativelyTo(() -> {
                LOG.info("Scheduled onTrigger event fired in Stabilized phase, job {}.", (Object)this.getJobId());
                this.onTrigger();
            }, firstChangeEventTimestamp, maxTriggerDelay);
        }

        @Override
        void onTrigger() {
            if (this.hasSufficientResources()) {
                LOG.info("Sufficient resources are met, progressing to subsequent state, job {}.", (Object)this.getJobId());
                this.context().triggerTransitionToSubsequentState();
            } else {
                LOG.debug("Sufficient resources are not met, progressing to idling, job {}.", (Object)this.getJobId());
                this.context().progressToIdling();
            }
        }
    }

    @VisibleForTesting
    static final class Transitioning
    extends Phase {
        private Transitioning(Supplier<Temporal> clock, DefaultStateTransitionManager context) {
            super(clock, context);
        }
    }
}

