/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumerator;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.runtime.jobmaster.event.JobEventManager;
import org.apache.flink.runtime.jobmaster.event.JobEventStore;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTest;
import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultBatchJobRecoveryHandler;
import org.apache.flink.runtime.scheduler.adaptivebatch.DummyTierFactory;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.DefaultShuffleMetrics;
import org.apache.flink.runtime.shuffle.JobShuffleContext;
import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleMasterContextImpl;
import org.apache.flink.runtime.shuffle.ShuffleMetrics;
import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.topology.Result;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={ParameterizedTestExtension.class})
public class BatchJobRecoveryTest {
    private final Duration previousWorkerRecoveryTimeout = Duration.ofSeconds(1L);
    @TempDir
    private java.nio.file.Path temporaryFolder;
    protected EventReceivingTasks receivingTasks;
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    static final TestingComponentMainThreadExecutor.Extension MAIN_EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Extension();
    private final TestingComponentMainThreadExecutor mainThreadExecutor = MAIN_EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();
    private ScheduledExecutor delayedExecutor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
    private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L);
    private static final int NUM_SPLITS = 10;
    private static final int SOURCE_PARALLELISM = 5;
    private static final int MIDDLE_PARALLELISM = 5;
    private static final int DECIDED_SINK_PARALLELISM = 2;
    private static final JobVertexID SOURCE_ID = new JobVertexID();
    private static final JobVertexID MIDDLE_ID = new JobVertexID();
    private static final JobVertexID SINK_ID = new JobVertexID();
    private static final JobID JOB_ID = new JobID();
    private SourceCoordinatorProvider<MockSourceSplit> provider;
    private FileSystemJobEventStore jobEventStore;
    private AtomicBoolean recoveryStarted;
    private List<JobEvent> persistedJobEventList;
    private byte[] serializedJobGraph;
    private final Collection<PartitionWithMetrics> allPartitionWithMetrics = new ArrayList<PartitionWithMetrics>();
    @Parameter
    public boolean enableSpeculativeExecution;
    @Parameter(value=1)
    public boolean isBlockingShuffle;

    @Parameters(name="enableSpeculativeExecution={0}, isBlockingShuffle={1}")
    public static Collection<Object[]> parameters() {
        Object[][] params = new Object[][]{{false, false}, {false, true}, {true, true}, {true, false}};
        return Arrays.asList(params);
    }

    @BeforeEach
    void setUp() throws IOException {
        Path rootPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.temporaryFolder).getAbsolutePath());
        this.delayedExecutor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.persistedJobEventList = new ArrayList<JobEvent>();
        this.recoveryStarted = new AtomicBoolean();
        this.jobEventStore = new TestingFileSystemJobEventStore(rootPath, new Configuration(), this.persistedJobEventList, this.recoveryStarted);
        this.provider = new SourceCoordinatorProvider("AdaptiveBatchSchedulerTest", OPERATOR_ID, (Source)new MockSource(Boundedness.BOUNDED, 10), 1, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);
        this.serializedJobGraph = this.serializeJobGraph(this.createDefaultJobGraph());
        this.allPartitionWithMetrics.clear();
    }

    @AfterEach
    void after() {
        this.jobEventStore.stop(true);
    }

    @TestTemplate
    void testRecoverFromJMFailover() throws Exception {
        AdaptiveBatchScheduler scheduler = this.createScheduler(this.deserializeJobGraph(this.serializedJobGraph));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)scheduler).startScheduling()));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, SOURCE_ID)));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.INITIALIZING, MIDDLE_ID);
            BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.RUNNING, MIDDLE_ID);
        }));
        List<ExecutionAttemptID> sourceExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID));
        List<ExecutionAttemptID> middleExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(MIDDLE_ID));
        HashMap<IntermediateResultPartitionID, Integer> subpartitionNums = new HashMap<IntermediateResultPartitionID, Integer>();
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(SOURCE_ID, scheduler.getExecutionGraph())) {
            IntermediateResultPartition partition = (IntermediateResultPartition)vertex.getProducedPartitions().values().iterator().next();
            subpartitionNums.put(partition.getPartitionId(), partition.getNumberOfSubpartitions());
        }
        this.waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.jobEventStore.stop(false)));
        this.registerPartitions(scheduler);
        AdaptiveBatchScheduler newScheduler = this.createScheduler(this.deserializeJobGraph(this.serializedJobGraph));
        this.startSchedulingAndWaitRecoverFinish(newScheduler);
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(SOURCE_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat(sourceExecutions).contains((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
            JobMasterPartitionTracker partitionTracker = ((InternalExecutionGraphAccessor)newScheduler.getExecutionGraph()).getPartitionTracker();
            List resultPartitionIds = vertex.getProducedPartitions().keySet().stream().map(arg_0 -> ((DefaultExecutionGraph)((DefaultExecutionGraph)newScheduler.getExecutionGraph())).createResultPartitionId(arg_0)).collect(Collectors.toList());
            for (ResultPartitionID partitionID : resultPartitionIds) {
                Assertions.assertThat((boolean)partitionTracker.isPartitionTracked(partitionID)).isTrue();
            }
            IntermediateResultPartition partition = (IntermediateResultPartition)vertex.getProducedPartitions().values().iterator().next();
            Assertions.assertThat((int)partition.getNumberOfSubpartitions()).isEqualTo(subpartitionNums.get(partition.getPartitionId()));
        }
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(MIDDLE_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat(middleExecutions).doesNotContain((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(vertex, ExecutionState.DEPLOYING, 15000L);
        }
    }

    @TestTemplate
    void testJobVertexUnFinishedAndOperatorCoordinatorNotSupportBatchSnapshot() throws Exception {
        JobGraph jobGraph = this.deserializeJobGraph(this.serializedJobGraph);
        JobVertex jobVertex = jobGraph.findVertexByID(MIDDLE_ID);
        jobVertex.addOperatorCoordinator(new SerializedValue((Object)new TestingOperatorCoordinator.Provider(((OperatorIDPair)jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID())));
        AdaptiveBatchScheduler scheduler = this.createScheduler(jobGraph, Duration.ZERO);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)scheduler).startScheduling()));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, SOURCE_ID)));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            ExecutionVertex firstMiddle = BatchJobRecoveryTest.getExecutionVertex(MIDDLE_ID, 0, scheduler.getExecutionGraph());
            AdaptiveBatchSchedulerTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, Collections.singletonList(firstMiddle.getCurrentExecutionAttempt()), null);
        }));
        List<ExecutionAttemptID> sourceExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID));
        List<ExecutionAttemptID> middleExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(MIDDLE_ID));
        this.waitUntilWriteExecutionVertexFinishedEventPersisted(6);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.jobEventStore.stop(false)));
        int subtaskIndex = 0;
        this.registerPartitions(scheduler, Collections.emptySet(), Collections.singleton(scheduler.getExecutionJobVertex(SOURCE_ID).getTaskVertices()[subtaskIndex].getID()));
        AdaptiveBatchScheduler newScheduler = this.createScheduler(jobGraph);
        this.startSchedulingAndWaitRecoverFinish(newScheduler);
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(SOURCE_ID, newScheduler.getExecutionGraph())) {
            if (vertex.getParallelSubtaskIndex() == subtaskIndex) {
                ExecutionGraphTestUtils.waitUntilExecutionVertexState(vertex, ExecutionState.DEPLOYING, 15000L);
                continue;
            }
            Assertions.assertThat(sourceExecutions).contains((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
            JobMasterPartitionTracker partitionTracker = ((InternalExecutionGraphAccessor)newScheduler.getExecutionGraph()).getPartitionTracker();
            List resultPartitionIds = vertex.getProducedPartitions().keySet().stream().map(arg_0 -> ((DefaultExecutionGraph)((DefaultExecutionGraph)newScheduler.getExecutionGraph())).createResultPartitionId(arg_0)).collect(Collectors.toList());
            for (ResultPartitionID partitionID : resultPartitionIds) {
                Assertions.assertThat((boolean)partitionTracker.isPartitionTracked(partitionID)).isTrue();
            }
        }
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(MIDDLE_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat(middleExecutions).doesNotContain((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            if (vertex.getParallelSubtaskIndex() == subtaskIndex) {
                ExecutionState expectedState = this.isBlockingShuffle ? ExecutionState.CREATED : ExecutionState.DEPLOYING;
                Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)expectedState);
                continue;
            }
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(vertex, ExecutionState.DEPLOYING, 15000L);
        }
    }

    @TestTemplate
    void testJobVertexFinishedAndOperatorCoordinatorNotSupportBatchSnapshotAndPartitionNotFound() throws Exception {
        JobGraph jobGraph = this.deserializeJobGraph(this.serializedJobGraph);
        JobVertex jobVertex = jobGraph.findVertexByID(SOURCE_ID);
        jobVertex.addOperatorCoordinator(new SerializedValue((Object)new TestingOperatorCoordinator.Provider(((OperatorIDPair)jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID())));
        AdaptiveBatchScheduler scheduler = this.createScheduler(jobGraph);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)scheduler).startScheduling()));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, SOURCE_ID)));
        List<ExecutionAttemptID> sourceExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID));
        this.waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.jobEventStore.stop(false)));
        this.registerPartitions(scheduler);
        AdaptiveBatchScheduler newScheduler = this.createScheduler(jobGraph);
        this.startSchedulingAndWaitRecoverFinish(newScheduler);
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(SOURCE_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat(sourceExecutions).contains((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
            JobMasterPartitionTracker partitionTracker = ((InternalExecutionGraphAccessor)newScheduler.getExecutionGraph()).getPartitionTracker();
            List resultPartitionIds = vertex.getProducedPartitions().keySet().stream().map(arg_0 -> ((DefaultExecutionGraph)((DefaultExecutionGraph)newScheduler.getExecutionGraph())).createResultPartitionId(arg_0)).collect(Collectors.toList());
            for (ResultPartitionID partitionID : resultPartitionIds) {
                Assertions.assertThat((boolean)partitionTracker.isPartitionTracked(partitionID)).isTrue();
            }
        }
        for (ExecutionVertex taskVertex : BatchJobRecoveryTest.getExecutionVertices(MIDDLE_ID, newScheduler.getExecutionGraph())) {
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(taskVertex, ExecutionState.DEPLOYING, 15000L);
        }
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.RUNNING, MIDDLE_ID)));
        ExecutionVertex firstMiddleTask = BatchJobRecoveryTest.getExecutionVertex(MIDDLE_ID, 0, newScheduler.getExecutionGraph());
        this.triggerFailedByDataConsumptionException((SchedulerBase)newScheduler, firstMiddleTask);
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(BatchJobRecoveryTest.getExecutionVertex(SOURCE_ID, 0, newScheduler.getExecutionGraph()), ExecutionState.DEPLOYING, 15000L);
        for (int i = 0; i < 5; ++i) {
            Assertions.assertThat((Comparable)BatchJobRecoveryTest.getExecutionVertex(SOURCE_ID, i, newScheduler.getExecutionGraph()).getExecutionState()).isNotEqualTo((Object)ExecutionState.FINISHED);
        }
    }

    @TestTemplate
    void testRecoverFromJMFailoverAndPartitionsUnavailable() throws Exception {
        AdaptiveBatchScheduler scheduler = this.createScheduler(this.deserializeJobGraph(this.serializedJobGraph));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)scheduler).startScheduling()));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, SOURCE_ID)));
        List<ExecutionAttemptID> sourceExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID));
        this.waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.jobEventStore.stop(false)));
        int losePartitionsTaskIndex = 0;
        this.registerPartitions(scheduler, Collections.emptySet(), Collections.singleton(BatchJobRecoveryTest.getExecutionVertex(SOURCE_ID, losePartitionsTaskIndex, scheduler.getExecutionGraph()).getID()));
        AdaptiveBatchScheduler newScheduler = this.createScheduler(this.deserializeJobGraph(this.serializedJobGraph));
        this.startSchedulingAndWaitRecoverFinish(newScheduler);
        List<ExecutionVertex> sourceTasks = BatchJobRecoveryTest.getExecutionVertices(SOURCE_ID, newScheduler.getExecutionGraph());
        for (int i = 0; i < sourceTasks.size(); ++i) {
            ExecutionVertex vertex = sourceTasks.get(i);
            if (i == losePartitionsTaskIndex) {
                Assertions.assertThat(sourceExecutions).doesNotContain((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
                ExecutionGraphTestUtils.waitUntilExecutionVertexState(vertex, ExecutionState.DEPLOYING, 15000L);
                continue;
            }
            Assertions.assertThat(sourceExecutions).contains((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
        }
    }

    @TestTemplate
    void testRecoverDecidedParallelismFromTheSameJobGraphInstance() throws Exception {
        JobGraph jobGraph = this.deserializeJobGraph(this.serializedJobGraph);
        AdaptiveBatchScheduler scheduler = this.createScheduler(jobGraph);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)scheduler).startScheduling()));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, SOURCE_ID)));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, MIDDLE_ID)));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, SINK_ID)));
        List<ExecutionAttemptID> sourceExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID));
        List<ExecutionAttemptID> middleExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(MIDDLE_ID));
        List<ExecutionAttemptID> sinkExecutions = BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(SINK_ID));
        this.waitUntilWriteExecutionVertexFinishedEventPersisted(12);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.jobEventStore.stop(false)));
        AdaptiveBatchScheduler newScheduler = this.createScheduler(jobGraph);
        this.startSchedulingAndWaitRecoverFinish(newScheduler);
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(SOURCE_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat(sourceExecutions).contains((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
        }
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(MIDDLE_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat(middleExecutions).contains((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
        }
        Assertions.assertThat((int)newScheduler.getExecutionJobVertex(SINK_ID).getParallelism()).isEqualTo(2);
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(SINK_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat(sinkExecutions).contains((Object[])new ExecutionAttemptID[]{vertex.getCurrentExecutionAttempt().getAttemptId()});
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FINISHED);
        }
    }

    @TestTemplate
    void testPartitionNotFoundTwiceAfterJMFailover() throws Exception {
        AdaptiveBatchScheduler scheduler = this.createScheduler(this.deserializeJobGraph(this.serializedJobGraph));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)scheduler).startScheduling()));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            SourceCoordinator<?, ?> sourceCoordinator = BatchJobRecoveryTest.getInternalSourceCoordinator(scheduler.getExecutionGraph(), SOURCE_ID);
            this.assignSplitsForAllSubTask(sourceCoordinator, BatchJobRecoveryTest.getCurrentAttemptIds(scheduler.getExecutionJobVertex(SOURCE_ID)));
            this.checkUnassignedSplits(sourceCoordinator, 0);
        }));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)scheduler, ExecutionState.FINISHED, SOURCE_ID)));
        this.waitUntilWriteExecutionVertexFinishedEventPersisted(5);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.jobEventStore.stop(false)));
        this.registerPartitions(scheduler);
        AdaptiveBatchScheduler newScheduler = this.createScheduler(this.deserializeJobGraph(this.serializedJobGraph));
        this.startSchedulingAndWaitRecoverFinish(newScheduler);
        SourceCoordinator<?, ?> sourceCoordinator = BatchJobRecoveryTest.getInternalSourceCoordinator(newScheduler.getExecutionGraph(), SOURCE_ID);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.checkUnassignedSplits(sourceCoordinator, 0)));
        ExecutionVertex firstMiddle0 = BatchJobRecoveryTest.getExecutionVertex(MIDDLE_ID, 0, newScheduler.getExecutionGraph());
        this.triggerFailedByDataConsumptionException((SchedulerBase)newScheduler, firstMiddle0);
        ExecutionState expectedState = this.isBlockingShuffle ? ExecutionState.CREATED : ExecutionState.DEPLOYING;
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(firstMiddle0, expectedState, 15000L);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.checkUnassignedSplits(sourceCoordinator, 2)));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            this.assignSplits(sourceCoordinator, BatchJobRecoveryTest.getExecutionVertex(SOURCE_ID, 0, newScheduler.getExecutionGraph()).getCurrentExecutionAttempt().getAttemptId());
            this.checkUnassignedSplits(sourceCoordinator, 0);
        }));
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> BatchJobRecoveryTest.transitionExecutionsState((SchedulerBase)newScheduler, ExecutionState.FINISHED, SOURCE_ID)));
        ExecutionVertex firstMiddle1 = BatchJobRecoveryTest.getExecutionVertex(MIDDLE_ID, 1, newScheduler.getExecutionGraph());
        this.triggerFailedByDataConsumptionException((SchedulerBase)newScheduler, firstMiddle1);
        ExecutionGraphTestUtils.waitUntilExecutionVertexState(firstMiddle1, expectedState, 15000L);
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.checkUnassignedSplits(sourceCoordinator, 2)));
    }

    @TestTemplate
    void testReplayEventFailed() throws Exception {
        JobEventStore failingJobEventStore = new JobEventStore(){

            public void start() {
            }

            public void stop(boolean clear) {
            }

            public void writeEvent(JobEvent event, boolean cutBlock) {
            }

            public JobEvent readEvent() throws Exception {
                throw new Exception();
            }

            public boolean isEmpty() {
                return false;
            }
        };
        ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
        this.delayedExecutor = taskRestartExecutor;
        AdaptiveBatchScheduler newScheduler = this.createScheduler(this.deserializeJobGraph(this.serializedJobGraph), failingJobEventStore, (Integer)BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue(), (Duration)BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE.defaultValue());
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)newScheduler).startScheduling()));
        taskRestartExecutor.triggerScheduledTasks();
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {}));
        Assertions.assertThat((Optional)ExceptionUtils.findThrowableWithMessage((Throwable)newScheduler.getExecutionGraph().getFailureCause(), (String)"Recover failed from JM failover")).isPresent();
        for (ExecutionVertex vertex : BatchJobRecoveryTest.getExecutionVertices(SOURCE_ID, newScheduler.getExecutionGraph())) {
            Assertions.assertThat((int)vertex.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(1);
            ExecutionGraphTestUtils.waitUntilExecutionVertexState(vertex, ExecutionState.DEPLOYING, 15000L);
        }
    }

    private void waitUntilWriteExecutionVertexFinishedEventPersisted(int count) throws Exception {
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> new ArrayList<JobEvent>(this.persistedJobEventList).stream().filter(jobEvent -> jobEvent instanceof ExecutionVertexFinishedEvent).count() == (long)count));
    }

    private void triggerFailedByDataConsumptionException(SchedulerBase scheduler, ExecutionVertex executionVertex) {
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            IntermediateResultPartitionID partitionId = BatchJobRecoveryTest.getConsumedResultPartitions(scheduler.getExecutionGraph().getSchedulingTopology(), executionVertex.getID()).get(0);
            AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, ExecutionState.FAILED, Collections.singletonList(executionVertex.getCurrentExecutionAttempt()), (Throwable)new PartitionNotFoundException(((DefaultExecutionGraph)scheduler.getExecutionGraph()).createResultPartitionId(partitionId)));
        }));
    }

    private void assignSplits(SourceCoordinator<?, ?> sourceCoordinator, ExecutionAttemptID attemptId) {
        int subtask = attemptId.getSubtaskIndex();
        int attemptNumber = attemptId.getAttemptNumber();
        sourceCoordinator.executionAttemptReady(subtask, attemptNumber, this.receivingTasks.createGatewayForSubtask(subtask, attemptNumber));
        sourceCoordinator.handleEventFromOperator(subtask, attemptNumber, (OperatorEvent)new ReaderRegistrationEvent(subtask, "location_" + subtask));
    }

    private void assignSplitsForAllSubTask(SourceCoordinator<?, ?> sourceCoordinator, List<ExecutionAttemptID> attemptIds) {
        attemptIds.forEach(attemptId -> this.assignSplits(sourceCoordinator, (ExecutionAttemptID)attemptId));
    }

    private void checkUnassignedSplits(SourceCoordinator<?, ?> sourceCoordinator, int expected) {
        MockSplitEnumerator newSplitEnumerator = (MockSplitEnumerator)sourceCoordinator.getEnumerator();
        this.runInCoordinatorThread(sourceCoordinator, () -> Assertions.assertThat((Collection)newSplitEnumerator.getUnassignedSplits()).hasSize(expected));
    }

    private void runInCoordinatorThread(SourceCoordinator<?, ?> sourceCoordinator, Runnable runnable) {
        try {
            sourceCoordinator.getCoordinatorExecutor().submit(runnable).get();
        }
        catch (Exception e) {
            org.junit.jupiter.api.Assertions.fail((String)("Test failed due to " + e));
        }
    }

    private void runInMainThread(@Nonnull ThrowingRunnable<Throwable> throwingRunnable) {
        this.mainThreadExecutor.execute(throwingRunnable);
    }

    private void registerPartitions(AdaptiveBatchScheduler scheduler) {
        this.registerPartitions(scheduler, Collections.emptySet(), Collections.emptySet());
    }

    private void registerPartitions(AdaptiveBatchScheduler scheduler, Set<JobVertexID> unavailablePartitionsJobVertices, Set<ExecutionVertexID> unavailablePartitionsExecutionVertices) {
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        List list = executionGraph.getAllIntermediateResults().values().stream().flatMap(result -> Arrays.stream(result.getPartitions())).filter(partition -> {
            ExecutionVertex producer = executionGraph.getResultPartitionOrThrow(partition.getPartitionId()).getProducer();
            return !unavailablePartitionsJobVertices.contains(producer.getJobvertexId()) && !unavailablePartitionsExecutionVertices.contains(producer.getID()) && producer.getExecutionState() == ExecutionState.FINISHED;
        }).map(partition -> {
            BlockingResultInfo resultInfo = scheduler.getBlockingResultInfo(partition.getIntermediateResult().getId());
            IntermediateResultPartitionID partitionId = partition.getPartitionId();
            Execution producer = executionGraph.getResultPartitionOrThrow(partitionId).getProducer().getPartitionProducer();
            ResultPartitionID resultPartitionID = new ResultPartitionID(partitionId, producer.getAttemptId());
            DefaultShuffleMetrics metrics = new DefaultShuffleMetrics(resultInfo == null ? new ResultPartitionBytes(new long[0]) : new ResultPartitionBytes(new long[resultInfo.getNumSubpartitions(0)]));
            return new TestPartitionWithMetrics(resultPartitionID, (ShuffleMetrics)metrics);
        }).collect(Collectors.toList());
        this.allPartitionWithMetrics.addAll(list);
    }

    private void startSchedulingAndWaitRecoverFinish(AdaptiveBatchScheduler scheduler) throws Exception {
        this.runInMainThread((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((AdaptiveBatchScheduler)scheduler).startScheduling()));
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> this.recoveryStarted.get() && !scheduler.isRecovering()));
    }

    private static SourceCoordinator<?, ?> getInternalSourceCoordinator(ExecutionGraph executionGraph, JobVertexID sourceID) throws Exception {
        ExecutionJobVertex sourceJobVertex = executionGraph.getJobVertex(sourceID);
        OperatorCoordinatorHolder operatorCoordinatorHolder = (OperatorCoordinatorHolder)new ArrayList(sourceJobVertex.getOperatorCoordinators()).get(0);
        RecreateOnResetOperatorCoordinator coordinator = (RecreateOnResetOperatorCoordinator)operatorCoordinatorHolder.coordinator();
        return (SourceCoordinator)coordinator.getInternalCoordinator();
    }

    private static List<IntermediateResultPartitionID> getConsumedResultPartitions(SchedulingTopology schedulingTopology, ExecutionVertexID executionVertexId) {
        return StreamSupport.stream(schedulingTopology.getVertex(executionVertexId).getConsumedResults().spliterator(), false).map(Result::getId).collect(Collectors.toList());
    }

    public static void transitionExecutionsState(SchedulerBase scheduler, ExecutionState state, JobVertexID jobVertexID) {
        AdaptiveBatchSchedulerTest.transitionExecutionsState(scheduler, state, scheduler.getExecutionJobVertex(jobVertexID).getJobVertex());
    }

    private JobGraph createDefaultJobGraph() throws IOException {
        ArrayList<JobVertex> jobVertices = new ArrayList<JobVertex>();
        JobVertex source = new JobVertex("source", SOURCE_ID);
        source.setInvokableClass(NoOpInvokable.class);
        source.addOperatorCoordinator(new SerializedValue(this.provider));
        source.setParallelism(5);
        jobVertices.add(source);
        JobVertex middle = new JobVertex("middle", MIDDLE_ID);
        middle.setInvokableClass(NoOpInvokable.class);
        middle.setParallelism(5);
        jobVertices.add(middle);
        JobVertex sink = new JobVertex("sink", SINK_ID);
        sink.setInvokableClass(NoOpInvokable.class);
        jobVertices.add(sink);
        ResultPartitionType resultPartitionType = this.isBlockingShuffle ? ResultPartitionType.BLOCKING : ResultPartitionType.HYBRID_FULL;
        JobVertexConnectionUtils.connectNewDataSetAsInput(middle, source, DistributionPattern.POINTWISE, resultPartitionType);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, middle, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        return new JobGraph(JOB_ID, "TestJob", jobVertices.toArray(new JobVertex[0]));
    }

    private JobGraph createDefaultHybridJobGraph() throws IOException {
        ArrayList<JobVertex> jobVertices = new ArrayList<JobVertex>();
        JobVertex source = new JobVertex("source", SOURCE_ID);
        source.setInvokableClass(NoOpInvokable.class);
        source.addOperatorCoordinator(new SerializedValue(this.provider));
        source.setParallelism(5);
        jobVertices.add(source);
        JobVertex middle = new JobVertex("middle", MIDDLE_ID);
        middle.setInvokableClass(NoOpInvokable.class);
        middle.setParallelism(5);
        jobVertices.add(middle);
        JobVertex sink = new JobVertex("sink", SINK_ID);
        sink.setInvokableClass(NoOpInvokable.class);
        jobVertices.add(sink);
        JobVertexConnectionUtils.connectNewDataSetAsInput(middle, source, DistributionPattern.POINTWISE, ResultPartitionType.HYBRID_FULL);
        JobVertexConnectionUtils.connectNewDataSetAsInput(sink, middle, DistributionPattern.ALL_TO_ALL, ResultPartitionType.HYBRID_FULL);
        return new JobGraph(JOB_ID, "TestJob", jobVertices.toArray(new JobVertex[0]));
    }

    private static ExecutionVertex getExecutionVertex(JobVertexID jobVertexId, int subtask, ExecutionGraph executionGraph) {
        return BatchJobRecoveryTest.getExecutionVertices(jobVertexId, executionGraph).get(subtask);
    }

    private static List<ExecutionVertex> getExecutionVertices(JobVertexID jobVertexId, ExecutionGraph executionGraph) {
        Preconditions.checkState((boolean)executionGraph.getJobVertex(jobVertexId).isInitialized());
        return Arrays.asList(executionGraph.getJobVertex(jobVertexId).getTaskVertices());
    }

    private static List<ExecutionAttemptID> getCurrentAttemptIds(ExecutionJobVertex jobVertex) {
        Preconditions.checkState((boolean)jobVertex.isInitialized());
        return Arrays.stream(jobVertex.getTaskVertices()).map(executionVertex -> executionVertex.getCurrentExecutionAttempt().getAttemptId()).collect(Collectors.toList());
    }

    private AdaptiveBatchScheduler createScheduler(JobGraph jobGraph) throws Exception {
        return this.createScheduler(jobGraph, (JobEventStore)this.jobEventStore, (Integer)BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue(), (Duration)BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE.defaultValue());
    }

    private AdaptiveBatchScheduler createScheduler(JobGraph jobGraph, Duration jobRecoverySnapshotMinPause) throws Exception {
        return this.createScheduler(jobGraph, (JobEventStore)this.jobEventStore, (Integer)BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM.defaultValue(), jobRecoverySnapshotMinPause);
    }

    private AdaptiveBatchScheduler createScheduler(JobGraph jobGraph, JobEventStore jobEventStore, int defaultMaxParallelism, Duration jobRecoverySnapshotMinPause) throws Exception {
        Configuration jobMasterConfig = new Configuration();
        jobMasterConfig.set(BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE, (Object)jobRecoverySnapshotMinPause);
        jobMasterConfig.set(BatchExecutionOptions.JOB_RECOVERY_ENABLED, (Object)true);
        jobMasterConfig.set(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT, (Object)this.previousWorkerRecoveryTimeout);
        if (!this.isBlockingShuffle) {
            jobMasterConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, (Object)BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
            jobMasterConfig.set(NettyShuffleEnvironmentOptions.NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME, (Object)DummyTierFactory.class.getName());
        }
        NettyShuffleMaster shuffleMaster = new NettyShuffleMaster((ShuffleMasterContext)new ShuffleMasterContextImpl(jobMasterConfig, throwable -> {}));
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setGetPartitionWithMetricsFunction((timeout, set) -> CompletableFuture.completedFuture(this.allPartitionWithMetrics)).build();
        shuffleMaster.registerJob((JobShuffleContext)new JobShuffleContextImpl(jobGraph.getJobID(), (JobMasterGateway)jobMasterGateway));
        JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl(jobGraph.getJobID(), (ShuffleMaster)shuffleMaster, ignored -> Optional.empty());
        DefaultSchedulerBuilder schedulerBuilder = new DefaultSchedulerBuilder(jobGraph, this.mainThreadExecutor.getMainThreadExecutor(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(10, 0L).create()).setShuffleMaster((ShuffleMaster<?>)shuffleMaster).setJobMasterConfiguration(jobMasterConfig).setPartitionTracker((JobMasterPartitionTracker)partitionTracker).setDelayExecutor(this.delayedExecutor).setJobRecoveryHandler((BatchJobRecoveryHandler)new DefaultBatchJobRecoveryHandler(new JobEventManager(jobEventStore), jobMasterConfig, jobGraph.getJobID())).setVertexParallelismAndInputInfosDecider(DefaultSchedulerBuilder.createCustomParallelismDecider(2)).setDefaultMaxParallelism(defaultMaxParallelism);
        return schedulerBuilder.buildAdaptiveBatchJobScheduler(this.enableSpeculativeExecution);
    }

    private byte[] serializeJobGraph(JobGraph jobGraph) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream oss = new ObjectOutputStream(byteArrayOutputStream);
        oss.writeObject(jobGraph);
        return byteArrayOutputStream.toByteArray();
    }

    private JobGraph deserializeJobGraph(byte[] serializedJobGraph) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(serializedJobGraph);
        ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);
        return (JobGraph)ois.readObject();
    }

    private static class TestingFileSystemJobEventStore
    extends FileSystemJobEventStore {
        private final List<JobEvent> persistedJobEventList;
        private final AtomicBoolean recoveryStarted;

        public TestingFileSystemJobEventStore(Path workingDir, Configuration configuration, List<JobEvent> persistedJobEventList, AtomicBoolean recoveryStarted) throws IOException {
            super(workingDir, configuration);
            this.persistedJobEventList = persistedJobEventList;
            this.recoveryStarted = recoveryStarted;
        }

        protected void writeEventRunnable(JobEvent event, boolean cutBlock) {
            super.writeEventRunnable(event, cutBlock);
            this.persistedJobEventList.add(event);
        }

        public JobEvent readEvent() throws Exception {
            this.recoveryStarted.compareAndSet(false, true);
            return super.readEvent();
        }
    }

    private static class TestPartitionWithMetrics
    implements PartitionWithMetrics {
        private final ResultPartitionID resultPartitionID;
        private final ShuffleMetrics metrics;

        public TestPartitionWithMetrics(ResultPartitionID resultPartitionID, ShuffleMetrics metrics) {
            this.resultPartitionID = resultPartitionID;
            this.metrics = metrics;
        }

        public ShuffleMetrics getPartitionMetrics() {
            return this.metrics;
        }

        public ShuffleDescriptor getPartition() {
            return new NettyShuffleDescriptor(ResourceID.generate(), null, this.resultPartitionID){

                public Optional<ResourceID> storesLocalResourcesOn() {
                    return Optional.empty();
                }
            };
        }
    }
}

