/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blocklist.BlocklistOperations;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.IndexRange;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy;
import org.apache.flink.runtime.executiongraph.ParallelismAndInputInfos;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.VertexInputInfoComputationUtils;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex;
import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveExecutionHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.AllToAllBlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext;
import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingInputInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultSpeculativeExecutionHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.DummySpeculativeExecutionHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.ExecutionPlanSchedulingContext;
import org.apache.flink.runtime.scheduler.adaptivebatch.JobGraphUpdateListener;
import org.apache.flink.runtime.scheduler.adaptivebatch.PointwiseBlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.SpeculativeExecutionHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

public class AdaptiveBatchScheduler
extends DefaultScheduler
implements JobGraphUpdateListener {
    private DefaultLogicalTopology logicalTopology;
    private final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider;
    private final Map<IntermediateDataSetID, BlockingResultInfo> blockingResultInfos;
    private final JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint;
    private final Map<JobVertexID, CompletableFuture<Integer>> sourceParallelismFuturesByJobVertexId;
    private final SpeculativeExecutionHandler speculativeExecutionHandler;
    private Set<JobVertexID> jobVerticesWithUnRecoveredCoordinators = new HashSet<JobVertexID>();
    private final BatchJobRecoveryHandler jobRecoveryHandler;
    private final AdaptiveExecutionHandler adaptiveExecutionHandler;
    private final int defaultMaxParallelism;

    public AdaptiveBatchScheduler(Logger log, AdaptiveExecutionHandler adaptiveExecutionHandler, Executor ioExecutor, Configuration jobMasterConfiguration, Consumer<ComponentMainThreadExecutor> startUpAction, ScheduledExecutor delayExecutor, ClassLoader userCodeLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory failoverStrategyFactory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionOperations executionOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, Collection<FailureEnricher> failureEnrichers, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Duration rpcTimeout, VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider, int defaultMaxParallelism, BlocklistOperations blocklistOperations, JobManagerOptions.HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint, BatchJobRecoveryHandler jobRecoveryHandler, ExecutionPlanSchedulingContext executionPlanSchedulingContext) throws Exception {
        super(log, adaptiveExecutionHandler.getJobGraph(), ioExecutor, jobMasterConfiguration, startUpAction, delayExecutor, userCodeLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, failoverStrategyFactory, restartBackoffTimeStrategy, executionOperations, executionVertexVersioner, executionSlotAllocatorFactory, initializationTimestamp, mainThreadExecutor, jobStatusListener, failureEnrichers, executionGraphFactory, shuffleMaster, rpcTimeout, AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(adaptiveExecutionHandler.getJobGraph().getVertices(), defaultMaxParallelism), new DefaultExecutionDeployer.Factory(), executionPlanSchedulingContext);
        this.adaptiveExecutionHandler = (AdaptiveExecutionHandler)Preconditions.checkNotNull((Object)adaptiveExecutionHandler);
        adaptiveExecutionHandler.registerJobGraphUpdateListener(this);
        this.defaultMaxParallelism = defaultMaxParallelism;
        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(this.getJobGraph());
        this.vertexParallelismAndInputInfosDecider = (VertexParallelismAndInputInfosDecider)Preconditions.checkNotNull((Object)vertexParallelismAndInputInfosDecider);
        this.blockingResultInfos = new HashMap<IntermediateDataSetID, BlockingResultInfo>();
        this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint;
        this.sourceParallelismFuturesByJobVertexId = new HashMap<JobVertexID, CompletableFuture<Integer>>();
        this.speculativeExecutionHandler = this.createSpeculativeExecutionHandler(log, jobMasterConfiguration, executionVertexVersioner, blocklistOperations);
        this.jobRecoveryHandler = jobRecoveryHandler;
    }

    private SpeculativeExecutionHandler createSpeculativeExecutionHandler(Logger log, Configuration jobMasterConfiguration, ExecutionVertexVersioner executionVertexVersioner, BlocklistOperations blocklistOperations) {
        if (((Boolean)jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED)).booleanValue()) {
            return new DefaultSpeculativeExecutionHandler(jobMasterConfiguration, blocklistOperations, this::getExecutionVertex, () -> this.getExecutionGraph().getRegisteredExecutions(), (newSpeculativeExecutions, verticesToDeploy) -> this.executionDeployer.allocateSlotsAndDeploy((List<Execution>)newSpeculativeExecutions, executionVertexVersioner.getExecutionVertexVersions((Collection<ExecutionVertexID>)verticesToDeploy)), log);
        }
        return new DummySpeculativeExecutionHandler();
    }

    @Override
    public void onNewJobVerticesAdded(List<JobVertex> newVertices, int pendingOperatorsCount) throws Exception {
        this.log.info("Received newly created job vertices: [{}]", newVertices);
        VertexParallelismStore vertexParallelismStore = AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph(newVertices, this.defaultMaxParallelism);
        DefaultExecutionGraphBuilder.initJobVerticesOnMaster(newVertices, this.getUserCodeLoader(), this.log, vertexParallelismStore, this.getJobGraph().getName(), this.getJobGraph().getJobID());
        this.getExecutionGraph().addNewJobVertices(newVertices, this.jobManagerJobMetricGroup, vertexParallelismStore);
        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(this.getJobGraph());
        this.getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(this.getJobGraph()));
        for (JobVertex newVertex : newVertices) {
            for (JobEdge input : newVertex.getInputs()) {
                this.tryUpdateResultInfo(input.getSourceId(), input.getDistributionPattern());
            }
        }
    }

    @Override
    protected void startSchedulingInternal() {
        this.speculativeExecutionHandler.init(this.getExecutionGraph(), this.getMainThreadExecutor(), this.jobManagerJobMetricGroup);
        this.jobRecoveryHandler.initialize(new DefaultBatchJobRecoveryContext());
        if (this.jobRecoveryHandler.needRecover()) {
            this.jobRecoveryHandler.startRecovering();
        } else {
            this.tryComputeSourceParallelismThenRunAsync((value, throwable) -> {
                if (this.getExecutionGraph().getState() == JobStatus.CREATED) {
                    this.initializeVerticesIfPossible();
                    super.startSchedulingInternal();
                }
            });
        }
    }

    @Override
    protected void maybeRestartTasks(FailureHandlingResult failureHandlingResult) {
        FailureHandlingResult wrappedResult = failureHandlingResult;
        if (failureHandlingResult.canRestart()) {
            Set<ExecutionVertexID> originalNeedToRestartVertices = failureHandlingResult.getVerticesToRestart();
            Set extraNeedToRestartJobVertices = originalNeedToRestartVertices.stream().map(ExecutionVertexID::getJobVertexId).filter(this.jobVerticesWithUnRecoveredCoordinators::contains).collect(Collectors.toSet());
            this.jobVerticesWithUnRecoveredCoordinators.removeAll(extraNeedToRestartJobVertices);
            Set<ExecutionVertexID> needToRestartVertices = extraNeedToRestartJobVertices.stream().flatMap(jobVertexId -> {
                ExecutionJobVertex jobVertex = this.getExecutionJobVertex((JobVertexID)jobVertexId);
                return Arrays.stream(jobVertex.getTaskVertices()).map(ExecutionVertex::getID);
            }).collect(Collectors.toSet());
            needToRestartVertices.addAll(originalNeedToRestartVertices);
            wrappedResult = FailureHandlingResult.restartable(failureHandlingResult.getFailedExecution().orElse(null), failureHandlingResult.getError(), failureHandlingResult.getTimestamp(), failureHandlingResult.getFailureLabels(), needToRestartVertices, failureHandlingResult.getRestartDelayMS(), failureHandlingResult.isGlobalFailure(), failureHandlingResult.isRootCause());
        }
        super.maybeRestartTasks(wrappedResult);
    }

    @VisibleForTesting
    boolean isRecovering() {
        return this.jobRecoveryHandler.isRecovering();
    }

    @Override
    protected void resetForNewExecutions(Collection<ExecutionVertexID> vertices) {
        super.resetForNewExecutions(vertices);
        if (!this.isRecovering()) {
            this.jobRecoveryHandler.onExecutionVertexReset(vertices);
        }
    }

    private void initializeJobVertex(ExecutionJobVertex jobVertex, int parallelism, Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos, long createTimestamp) throws JobException {
        if (!jobVertex.isParallelismDecided()) {
            this.changeJobVertexParallelism(jobVertex, parallelism);
        } else {
            Preconditions.checkState((parallelism == jobVertex.getParallelism() ? 1 : 0) != 0);
        }
        Preconditions.checkState((boolean)this.canInitialize(jobVertex));
        this.getExecutionGraph().initializeJobVertex(jobVertex, createTimestamp, jobVertexInputInfos);
        if (!this.isRecovering()) {
            this.jobRecoveryHandler.onExecutionJobVertexInitialization(jobVertex.getJobVertex().getID(), parallelism, jobVertexInputInfos);
        }
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        this.jobRecoveryHandler.stop(this.requestJobStatus().isGloballyTerminalState());
        this.speculativeExecutionHandler.stopSlowTaskDetector();
        return super.closeAsync();
    }

    @Override
    protected void onTaskFinished(Execution execution, IOMetrics ioMetrics) {
        this.speculativeExecutionHandler.notifyTaskFinished(execution, this::cancelPendingExecutions);
        if (!this.isRecovering()) {
            this.jobRecoveryHandler.onExecutionFinished(execution.getVertex().getID());
        }
        Preconditions.checkNotNull((Object)ioMetrics);
        this.updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes());
        this.notifyJobVertexFinishedIfPossible(execution.getVertex().getJobVertex());
        ExecutionVertexVersion currentVersion = this.executionVertexVersioner.getExecutionVertexVersion(execution.getVertex().getID());
        this.tryComputeSourceParallelismThenRunAsync((value, throwable) -> {
            if (this.executionVertexVersioner.isModified(currentVersion)) {
                this.log.debug("Initialization of vertices will be skipped, because the execution vertex version has been modified.");
                return;
            }
            this.initializeVerticesIfPossible();
            super.onTaskFinished(execution, ioMetrics);
        });
    }

    private CompletableFuture<?> cancelPendingExecutions(ExecutionVertexID executionVertexId) {
        List pendingExecutions = this.getExecutionVertex(executionVertexId).getCurrentExecutions().stream().filter(e -> !e.getState().isTerminal() && e.getState() != ExecutionState.CANCELING).collect(Collectors.toList());
        if (pendingExecutions.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.log.info("Canceling {} un-finished executions of {} because one of its executions has finished.", (Object)pendingExecutions.size(), (Object)executionVertexId);
        FutureUtils.ConjunctFuture future = FutureUtils.combineAll((Collection)pendingExecutions.stream().map(x$0 -> this.cancelExecution((Execution)x$0)).collect(Collectors.toList()));
        this.cancelAllPendingSlotRequestsForVertex(executionVertexId);
        return future;
    }

    @Override
    protected void onTaskFailed(Execution execution) {
        this.speculativeExecutionHandler.notifyTaskFailed(execution);
        super.onTaskFailed(execution);
    }

    @Override
    protected void handleTaskFailure(Execution failedExecution, @Nullable Throwable error) {
        if (!this.speculativeExecutionHandler.handleTaskFailure(failedExecution, error, this::handleLocalExecutionAttemptFailure)) {
            super.handleTaskFailure(failedExecution, error);
        }
    }

    private void handleLocalExecutionAttemptFailure(Execution failedExecution, @Nullable Throwable error) {
        this.executionSlotAllocator.cancel(failedExecution.getAttemptId());
        FailureHandlingResult failureHandlingResult = this.recordTaskFailure(failedExecution, error);
        if (failureHandlingResult.canRestart()) {
            this.archiveFromFailureHandlingResult(this.createFailureHandlingResultSnapshot(failureHandlingResult));
        } else {
            this.failJob(error, failureHandlingResult.getTimestamp(), failureHandlingResult.getFailureLabels());
        }
    }

    private void updateResultPartitionBytesMetrics(Map<IntermediateResultPartitionID, ResultPartitionBytes> resultPartitionBytes) {
        Preconditions.checkNotNull(resultPartitionBytes);
        resultPartitionBytes.forEach((partitionId, partitionBytes) -> {
            IntermediateResult result = this.getExecutionGraph().getAllIntermediateResults().get(partitionId.getIntermediateDataSetID());
            Preconditions.checkNotNull((Object)result);
            this.blockingResultInfos.compute(result.getId(), (ignored, resultInfo) -> {
                if (resultInfo == null) {
                    resultInfo = AdaptiveBatchScheduler.createFromIntermediateResult(result, new HashMap<Integer, long[]>());
                }
                resultInfo.recordPartitionInfo(partitionId.getPartitionNumber(), (ResultPartitionBytes)partitionBytes);
                return resultInfo;
            });
        });
    }

    private void notifyFineGrainedSubpartitionBytesMayNotNeeded(BlockingResultInfo resultInfo) {
        IntermediateResult intermediateResult = this.getExecutionGraph().getAllIntermediateResults().get(resultInfo.getResultId());
        if (resultInfo instanceof AllToAllBlockingResultInfo && intermediateResult.areAllConsumerVerticesCreated() && intermediateResult.getConsumerVertices().stream().map(this::getExecutionJobVertex).allMatch(ExecutionJobVertex::isInitialized) && intermediateResult.getConsumerVertices().stream().map(this::getExecutionJobVertex).map(ExecutionJobVertex::getTaskVertices).allMatch(taskVertices -> Arrays.stream(taskVertices).allMatch(taskVertex -> taskVertex.getInputBytes() != -1L))) {
            ((AllToAllBlockingResultInfo)resultInfo).onFineGrainedSubpartitionBytesNotNeeded();
        }
    }

    @Override
    public void allocateSlotsAndDeploy(List<ExecutionVertexID> verticesToDeploy) {
        List<ExecutionVertex> executionVertices = verticesToDeploy.stream().map(this::getExecutionVertex).collect(Collectors.toList());
        this.enrichInputBytesForExecutionVertices(executionVertices);
        super.allocateSlotsAndDeploy(verticesToDeploy);
    }

    @Override
    protected void resetForNewExecution(ExecutionVertexID executionVertexId) {
        this.speculativeExecutionHandler.resetForNewExecution(executionVertexId);
        ExecutionVertex executionVertex = this.getExecutionVertex(executionVertexId);
        if (executionVertex.getExecutionState() == ExecutionState.FINISHED) {
            executionVertex.getProducedPartitions().values().forEach(partition -> this.blockingResultInfos.computeIfPresent(partition.getIntermediateResult().getId(), (ignored, resultInfo) -> {
                resultInfo.resetPartitionInfo(partition.getPartitionNumber());
                return resultInfo;
            }));
        }
        super.resetForNewExecution(executionVertexId);
    }

    @Override
    protected MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() {
        return rp -> rp.isBlockingOrBlockingPersistentResultPartition() || this.hybridPartitionDataConsumeConstraint.isOnlyConsumeFinishedPartition();
    }

    private void tryComputeSourceParallelismThenRunAsync(BiConsumer<Void, Throwable> action) {
        FutureUtils.ConjunctFuture dynamicSourceParallelismFutures = FutureUtils.waitForAll(this.computeDynamicSourceParallelism());
        dynamicSourceParallelismFutures.whenCompleteAsync(action, (Executor)this.getMainThreadExecutor()).exceptionally(throwable -> {
            this.log.error("An unexpected error occurred while scheduling.", throwable);
            this.handleGlobalFailure(new SuppressRestartsException((Throwable)throwable));
            return null;
        });
    }

    public List<CompletableFuture<Integer>> computeDynamicSourceParallelism() {
        ArrayList<CompletableFuture<Integer>> dynamicSourceParallelismFutures = new ArrayList<CompletableFuture<Integer>>();
        for (ExecutionJobVertex jobVertex : this.getExecutionGraph().getVerticesTopologically()) {
            List<SourceCoordinator<?, ?>> sourceCoordinators = jobVertex.getSourceCoordinators();
            if (sourceCoordinators.isEmpty() || jobVertex.isParallelismDecided()) continue;
            if (this.sourceParallelismFuturesByJobVertexId.containsKey(jobVertex.getJobVertexId())) {
                dynamicSourceParallelismFutures.add(this.sourceParallelismFuturesByJobVertexId.get(jobVertex.getJobVertexId()));
                continue;
            }
            Optional<List<BlockingInputInfo>> consumedResultsInfo = this.tryGetConsumedResultsInfoView(jobVertex);
            if (!consumedResultsInfo.isPresent()) continue;
            List<CompletableFuture<Integer>> sourceParallelismFutures = sourceCoordinators.stream().map(sourceCoordinator -> sourceCoordinator.inferSourceParallelismAsync(this.vertexParallelismAndInputInfosDecider.computeSourceParallelismUpperBound(jobVertex.getJobVertexId(), jobVertex.getMaxParallelism()), this.vertexParallelismAndInputInfosDecider.getDataVolumePerTask())).collect(Collectors.toList());
            CompletableFuture<Integer> dynamicSourceParallelismFuture = AdaptiveBatchScheduler.mergeDynamicParallelismFutures(sourceParallelismFutures);
            this.sourceParallelismFuturesByJobVertexId.put(jobVertex.getJobVertexId(), dynamicSourceParallelismFuture);
            dynamicSourceParallelismFutures.add(dynamicSourceParallelismFuture);
        }
        return dynamicSourceParallelismFutures;
    }

    @VisibleForTesting
    static CompletableFuture<Integer> mergeDynamicParallelismFutures(List<CompletableFuture<Integer>> sourceParallelismFutures) {
        return sourceParallelismFutures.stream().reduce(CompletableFuture.completedFuture(-1), (a, b) -> a.thenCombine((CompletionStage)b, Math::max));
    }

    private void notifyJobVertexFinishedIfPossible(ExecutionJobVertex jobVertex) {
        Optional<Map<IntermediateDataSetID, BlockingResultInfo>> producedResultsInfo = this.getProducedResultsInfo(jobVertex);
        producedResultsInfo.ifPresent(resultInfo -> this.adaptiveExecutionHandler.handleJobEvent(new ExecutionJobVertexFinishedEvent(jobVertex.getJobVertexId(), (Map<IntermediateDataSetID, BlockingResultInfo>)resultInfo)));
    }

    @VisibleForTesting
    public void initializeVerticesIfPossible() {
        ArrayList<ExecutionJobVertex> newlyInitializedJobVertices = new ArrayList<ExecutionJobVertex>();
        try {
            long createTimestamp = System.currentTimeMillis();
            for (ExecutionJobVertex jobVertex : this.getExecutionGraph().getVerticesTopologically()) {
                if (jobVertex.isInitialized()) continue;
                if (this.canInitialize(jobVertex)) {
                    this.initializeJobVertex(jobVertex, jobVertex.getParallelism(), VertexInputInfoComputationUtils.computeVertexInputInfos(jobVertex, this.getExecutionGraph().getAllIntermediateResults()::get), createTimestamp);
                    newlyInitializedJobVertices.add(jobVertex);
                    continue;
                }
                Optional<List<BlockingInputInfo>> consumedResultsInfo = this.tryGetConsumedResultsInfoView(jobVertex);
                if (!consumedResultsInfo.isPresent()) continue;
                ParallelismAndInputInfos parallelismAndInputInfos = this.tryDecideParallelismAndInputInfos(jobVertex, consumedResultsInfo.get());
                this.initializeJobVertex(jobVertex, parallelismAndInputInfos.getParallelism(), parallelismAndInputInfos.getJobVertexInputInfos(), createTimestamp);
                newlyInitializedJobVertices.add(jobVertex);
            }
        }
        catch (JobException ex) {
            this.log.error("Unexpected error occurred when initializing ExecutionJobVertex", (Throwable)((Object)ex));
            this.handleGlobalFailure(new SuppressRestartsException((Throwable)((Object)ex)));
        }
        if (newlyInitializedJobVertices.size() > 0) {
            this.updateTopology(newlyInitializedJobVertices);
        }
    }

    private ParallelismAndInputInfos tryDecideParallelismAndInputInfos(ExecutionJobVertex jobVertex, List<BlockingInputInfo> inputs) {
        int vertexInitialParallelism = this.adaptiveExecutionHandler.getInitialParallelism(jobVertex.getJobVertexId());
        int vertexMinParallelism = -1;
        if (this.sourceParallelismFuturesByJobVertexId.containsKey(jobVertex.getJobVertexId())) {
            int dynamicSourceParallelism = this.getDynamicSourceParallelism(jobVertex);
            if (!inputs.isEmpty()) {
                vertexMinParallelism = dynamicSourceParallelism;
            } else {
                vertexInitialParallelism = dynamicSourceParallelism;
            }
        }
        ParallelismAndInputInfos parallelismAndInputInfos = this.vertexParallelismAndInputInfosDecider.decideParallelismAndInputInfosForVertex(jobVertex.getJobVertexId(), inputs, vertexInitialParallelism, vertexMinParallelism, jobVertex.getMaxParallelism());
        if (vertexInitialParallelism == -1) {
            this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {}.", new Object[]{jobVertex.getName(), jobVertex.getJobVertexId(), parallelismAndInputInfos.getParallelism()});
        } else {
            Preconditions.checkState((parallelismAndInputInfos.getParallelism() == vertexInitialParallelism ? 1 : 0) != 0);
        }
        this.adaptiveExecutionHandler.notifyJobVertexParallelismDecided(jobVertex.getJobVertexId(), parallelismAndInputInfos.getParallelism());
        return parallelismAndInputInfos;
    }

    private int getDynamicSourceParallelism(ExecutionJobVertex jobVertex) {
        CompletableFuture<Integer> dynamicSourceParallelismFuture = this.sourceParallelismFuturesByJobVertexId.get(jobVertex.getJobVertexId());
        int dynamicSourceParallelism = -1;
        if (dynamicSourceParallelismFuture != null) {
            int vertexMaxParallelism;
            dynamicSourceParallelism = dynamicSourceParallelismFuture.join();
            if (dynamicSourceParallelism > (vertexMaxParallelism = jobVertex.getMaxParallelism())) {
                this.log.info("The dynamic inferred source parallelism {} is larger than the maximum parallelism {}. Use {} as the upper bound parallelism of source job vertex {}.", new Object[]{dynamicSourceParallelism, vertexMaxParallelism, vertexMaxParallelism, jobVertex.getJobVertexId()});
                dynamicSourceParallelism = vertexMaxParallelism;
            } else if (dynamicSourceParallelism > 0) {
                this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to dynamic source parallelism inference.", new Object[]{jobVertex.getName(), jobVertex.getJobVertexId(), dynamicSourceParallelism});
            } else {
                dynamicSourceParallelism = -1;
            }
        }
        return dynamicSourceParallelism;
    }

    private void enrichInputBytesForExecutionVertices(List<ExecutionVertex> executionVertices) {
        for (ExecutionVertex ev : executionVertices) {
            List<IntermediateResult> intermediateResults = ev.getJobVertex().getInputs();
            boolean hasHybridEdge = intermediateResults.stream().anyMatch(ir -> ir.getResultType() == ResultPartitionType.HYBRID_FULL || ir.getResultType() == ResultPartitionType.HYBRID_SELECTIVE);
            if (ev.getInputBytes() != -1L || intermediateResults.isEmpty() || hasHybridEdge) continue;
            long inputBytes = 0L;
            for (IntermediateResult intermediateResult : intermediateResults) {
                ExecutionVertexInputInfo inputInfo = ev.getExecutionVertexInputInfo(intermediateResult.getId());
                BlockingResultInfo blockingResultInfo = (BlockingResultInfo)Preconditions.checkNotNull((Object)this.getBlockingResultInfo(intermediateResult.getId()));
                Map<IndexRange, IndexRange> consumedSubpartitionGroups = inputInfo.getConsumedSubpartitionGroups();
                for (Map.Entry<IndexRange, IndexRange> entry : consumedSubpartitionGroups.entrySet()) {
                    IndexRange partitionIndexRange = entry.getKey();
                    IndexRange subpartitionIndexRange = entry.getValue();
                    inputBytes += blockingResultInfo.getNumBytesProduced(partitionIndexRange, subpartitionIndexRange);
                }
                this.notifyFineGrainedSubpartitionBytesMayNotNeeded(blockingResultInfo);
            }
            ev.setInputBytes(inputBytes);
        }
    }

    private void changeJobVertexParallelism(ExecutionJobVertex jobVertex, int parallelism) {
        if (jobVertex.isParallelismDecided()) {
            return;
        }
        jobVertex.getJobVertex().setDynamicParallelism(parallelism);
        try {
            this.getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(this.getJobGraph()));
        }
        catch (Throwable t) {
            this.log.warn("Cannot create JSON plan for job", t);
            this.getExecutionGraph().setJsonPlan("{}");
        }
        jobVertex.setParallelism(parallelism);
    }

    private Optional<List<BlockingInputInfo>> tryGetConsumedResultsInfoView(ExecutionJobVertex jobVertex) {
        ArrayList<BlockingInputInfo> consumableResultInfo = new ArrayList<BlockingInputInfo>();
        DefaultLogicalVertex logicalVertex = this.logicalTopology.getVertex(jobVertex.getJobVertexId());
        Iterator<JobEdge> jobEdges = jobVertex.getJobVertex().getInputs().iterator();
        for (DefaultLogicalResult consumedResult : logicalVertex.getConsumedResults()) {
            Preconditions.checkState((boolean)jobEdges.hasNext());
            JobEdge jobEdge = jobEdges.next();
            ExecutionJobVertex producerVertex = this.getExecutionJobVertex(consumedResult.getProducer().getId());
            if (producerVertex.isFinished()) {
                BlockingResultInfo resultInfo = (BlockingResultInfo)Preconditions.checkNotNull((Object)this.blockingResultInfos.get(consumedResult.getId()));
                consumableResultInfo.add(new BlockingInputInfo(resultInfo, jobEdge.getTypeNumber(), jobEdge.areInterInputsKeysCorrelated(), jobEdge.isIntraInputKeyCorrelated()));
                continue;
            }
            return Optional.empty();
        }
        return Optional.of(consumableResultInfo);
    }

    private Optional<Map<IntermediateDataSetID, BlockingResultInfo>> getProducedResultsInfo(ExecutionJobVertex jobVertex) {
        if (!jobVertex.isFinished()) {
            return Optional.empty();
        }
        HashMap<IntermediateDataSetID, BlockingResultInfo> producedResultInfo = new HashMap<IntermediateDataSetID, BlockingResultInfo>();
        DefaultLogicalVertex logicalVertex = this.logicalTopology.getVertex(jobVertex.getJobVertexId());
        Iterable<DefaultLogicalResult> producedResults = logicalVertex.getProducedResults();
        for (DefaultLogicalResult producedResult : producedResults) {
            BlockingResultInfo resultInfo = (BlockingResultInfo)Preconditions.checkNotNull((Object)this.blockingResultInfos.get(producedResult.getId()));
            producedResultInfo.put(producedResult.getId(), resultInfo);
        }
        return Optional.of(producedResultInfo);
    }

    private boolean canInitialize(ExecutionJobVertex jobVertex) {
        if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided() && this.adaptiveExecutionHandler.getInitialParallelism(jobVertex.getJobVertexId()) == -1) {
            return false;
        }
        if (!jobVertex.isParallelismDecided()) {
            this.changeJobVertexParallelism(jobVertex, this.adaptiveExecutionHandler.getInitialParallelism(jobVertex.getJobVertexId()));
        }
        for (JobEdge inputEdge : jobVertex.getJobVertex().getInputs()) {
            ExecutionJobVertex producerVertex = this.getExecutionGraph().getJobVertex(inputEdge.getSource().getProducer().getID());
            Preconditions.checkNotNull((Object)producerVertex);
            if (producerVertex.isInitialized()) continue;
            return false;
        }
        return true;
    }

    private void updateTopology(List<ExecutionJobVertex> newlyInitializedJobVertices) {
        for (ExecutionJobVertex vertex : newlyInitializedJobVertices) {
            this.initializeOperatorCoordinatorsFor(vertex);
        }
        this.getExecutionGraph().notifyNewlyInitializedJobVertices(newlyInitializedJobVertices);
    }

    private void initializeOperatorCoordinatorsFor(ExecutionJobVertex vertex) {
        this.operatorCoordinatorHandler.registerAndStartNewCoordinators(vertex.getOperatorCoordinators(), this.getMainThreadExecutor(), vertex.getParallelism());
    }

    @VisibleForTesting
    public static VertexParallelismStore computeVertexParallelismStoreForDynamicGraph(Iterable<JobVertex> vertices, int defaultMaxParallelism) {
        AdaptiveBatchScheduler.resetDynamicParallelism(vertices);
        return AdaptiveBatchScheduler.computeVertexParallelismStore(vertices, v -> AdaptiveBatchScheduler.computeMaxParallelism(v.getParallelism(), defaultMaxParallelism), Function.identity());
    }

    public static int computeMaxParallelism(int parallelism, int defaultMaxParallelism) {
        if (parallelism > 0) {
            return AdaptiveBatchScheduler.getDefaultMaxParallelism(parallelism);
        }
        return defaultMaxParallelism;
    }

    private static void resetDynamicParallelism(Iterable<JobVertex> vertices) {
        for (JobVertex vertex : vertices) {
            if (!vertex.isDynamicParallelism()) continue;
            vertex.setParallelism(-1);
        }
    }

    private static BlockingResultInfo createFromIntermediateResult(IntermediateResult result, Map<Integer, long[]> subpartitionBytesByPartitionIndex) {
        Preconditions.checkArgument((result != null ? 1 : 0) != 0);
        if (result.getConsumingDistributionPattern() == DistributionPattern.POINTWISE) {
            return new PointwiseBlockingResultInfo(result.getId(), result.getNumberOfAssignedPartitions(), result.getPartitions()[0].getNumberOfSubpartitions(), subpartitionBytesByPartitionIndex);
        }
        return new AllToAllBlockingResultInfo(result.getId(), result.getNumberOfAssignedPartitions(), result.getPartitions()[0].getNumberOfSubpartitions(), result.isSingleSubpartitionContainsAllData(), subpartitionBytesByPartitionIndex);
    }

    @VisibleForTesting
    BlockingResultInfo getBlockingResultInfo(IntermediateDataSetID resultId) {
        return this.blockingResultInfos.get(resultId);
    }

    @VisibleForTesting
    SpeculativeExecutionHandler getSpeculativeExecutionHandler() {
        return this.speculativeExecutionHandler;
    }

    private void tryUpdateResultInfo(IntermediateDataSetID id, DistributionPattern targetPattern) {
        if (this.blockingResultInfos.containsKey(id)) {
            BlockingResultInfo resultInfo = this.blockingResultInfos.get(id);
            IntermediateResult result = this.getExecutionGraph().getAllIntermediateResults().get(id);
            if (targetPattern == DistributionPattern.ALL_TO_ALL && resultInfo.isPointwise() || targetPattern == DistributionPattern.POINTWISE && !resultInfo.isPointwise()) {
                BlockingResultInfo newInfo = AdaptiveBatchScheduler.createFromIntermediateResult(result, resultInfo.getSubpartitionBytesByPartitionIndex());
                this.blockingResultInfos.put(id, newInfo);
            } else if (resultInfo instanceof AllToAllBlockingResultInfo) {
                ((AllToAllBlockingResultInfo)resultInfo).setBroadcast(result.isBroadcast());
            }
        }
    }

    private class DefaultBatchJobRecoveryContext
    implements BatchJobRecoveryContext {
        private final FailoverStrategy restartStrategyOnResultConsumable;
        private final FailoverStrategy restartStrategyNotOnResultConsumable;

        private DefaultBatchJobRecoveryContext() {
            this.restartStrategyOnResultConsumable = new RestartPipelinedRegionFailoverStrategy.Factory().create(AdaptiveBatchScheduler.this.getSchedulingTopology(), AdaptiveBatchScheduler.this.getResultPartitionAvailabilityChecker());
            this.restartStrategyNotOnResultConsumable = new RestartPipelinedRegionFailoverStrategy.Factory().create(AdaptiveBatchScheduler.this.getSchedulingTopology(), ignored -> true);
        }

        @Override
        public ExecutionGraph getExecutionGraph() {
            return AdaptiveBatchScheduler.this.getExecutionGraph();
        }

        @Override
        public ShuffleMaster<?> getShuffleMaster() {
            return AdaptiveBatchScheduler.this.shuffleMaster;
        }

        @Override
        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID vertexId, boolean considerResultConsumable) {
            if (considerResultConsumable) {
                return this.restartStrategyOnResultConsumable.getTasksNeedingRestart(vertexId, null);
            }
            return this.restartStrategyNotOnResultConsumable.getTasksNeedingRestart(vertexId, null);
        }

        @Override
        public ComponentMainThreadExecutor getMainThreadExecutor() {
            return AdaptiveBatchScheduler.this.getMainThreadExecutor();
        }

        @Override
        public void resetVerticesInRecovering(Set<ExecutionVertexID> verticesToReset) throws Exception {
            for (ExecutionVertexID executionVertexID : verticesToReset) {
                AdaptiveBatchScheduler.this.notifyCoordinatorsAboutTaskFailure(AdaptiveBatchScheduler.this.getExecutionVertex(executionVertexID).getCurrentExecutionAttempt(), null);
            }
            AdaptiveBatchScheduler.this.resetForNewExecutions(verticesToReset);
            AdaptiveBatchScheduler.this.restoreState(verticesToReset, false);
        }

        @Override
        public void updateResultPartitionBytesMetrics(Map<IntermediateResultPartitionID, ResultPartitionBytes> resultPartitionBytes) {
            AdaptiveBatchScheduler.this.updateResultPartitionBytesMetrics(resultPartitionBytes);
        }

        @Override
        public void initializeJobVertex(ExecutionJobVertex jobVertex, int parallelism, Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos, long createTimestamp) throws JobException {
            AdaptiveBatchScheduler.this.initializeJobVertex(jobVertex, parallelism, jobVertexInputInfos, createTimestamp);
        }

        @Override
        public void updateTopology(List<ExecutionJobVertex> newlyInitializedJobVertices) {
            AdaptiveBatchScheduler.this.updateTopology(newlyInitializedJobVertices);
        }

        @Override
        public void onRecoveringFinished(Set<JobVertexID> jobVerticesWithUnRecoveredCoordinators) {
            AdaptiveBatchScheduler.this.jobVerticesWithUnRecoveredCoordinators = new HashSet<JobVertexID>(jobVerticesWithUnRecoveredCoordinators);
            AdaptiveBatchScheduler.this.tryComputeSourceParallelismThenRunAsync((value, throwable) -> AdaptiveBatchScheduler.this.schedulingStrategy.scheduleAllVerticesIfPossible());
        }

        @Override
        public void onRecoveringFailed() {
            AdaptiveBatchScheduler.this.initializeVerticesIfPossible();
            AdaptiveBatchScheduler.this.handleGlobalFailure((Throwable)new FlinkRuntimeException("Recover failed from JM failover, fail global."));
        }

        @Override
        public void failJob(Throwable cause, long timestamp, CompletableFuture<Map<String, String>> failureLabels) {
            AdaptiveBatchScheduler.this.failJob(cause, timestamp, failureLabels);
        }
    }
}

