/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.collections.iterators.IteratorChain;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculatorContext;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestingOperatorCoordinator;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

class OperatorCoordinatorHolderTest {
    private final GlobalFailureHandler globalFailureHandler = t -> {
        this.globalFailure = t;
    };
    private Throwable globalFailure;

    OperatorCoordinatorHolderTest() {
    }

    @AfterEach
    void checkNoGlobalFailure() throws Exception {
        if (this.globalFailure != null) {
            ExceptionUtils.rethrowException((Throwable)this.globalFailure);
        }
    }

    @Test
    void checkpointFutureInitiallyNotDone() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        CompletableFuture checkpointFuture = new CompletableFuture();
        holder.checkpointCoordinator(1L, checkpointFuture);
        Assertions.assertThat(checkpointFuture).isNotDone();
    }

    @Test
    void completedCheckpointFuture() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        byte[] testData = new byte[]{11, 22, 33, 44};
        CompletableFuture checkpointFuture = new CompletableFuture();
        holder.checkpointCoordinator(9L, checkpointFuture);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(testData);
        Assertions.assertThat(checkpointFuture).isDone();
        FlinkAssertions.assertThatFuture(checkpointFuture).eventuallySucceeds().isEqualTo((Object)testData);
    }

    @Test
    void eventsBeforeCheckpointFutureCompletionPassThrough() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        holder.checkpointCoordinator(1L, new CompletableFuture());
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(1).sendEvent((OperatorEvent)new TestOperatorEvent(1));
        holder.handleEventFromOperator(1, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(1L));
        Assertions.assertThat(tasks.getSentEventsForSubtask(1)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(1)});
    }

    @Test
    void eventsAreBlockedAfterCheckpointFutureCompletes() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 10L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1337));
        Assertions.assertThat((int)tasks.getNumberOfSentEvents()).isZero();
    }

    @Test
    void abortedCheckpointReleasesBlockedEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 123L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1337));
        holder.abortCurrentTriggering();
        Assertions.assertThat(tasks.getSentEventsForSubtask(0)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(1337)});
    }

    @Test
    void acknowledgeCheckpointEventReleasesBlockedEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 1111L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1337));
        holder.handleEventFromOperator(0, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(1111L));
        Assertions.assertThat(tasks.getSentEventsForSubtask(0)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(1337)});
    }

    @Test
    void restoreOpensGatewayEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        this.triggerAndCompleteCheckpoint(holder, 1000L);
        holder.resetToCheckpoint(1L, new byte[0]);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(1).sendEvent((OperatorEvent)new TestOperatorEvent(999));
        Assertions.assertThat(tasks.getSentEventsForSubtask(1)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(999)});
    }

    @Test
    void lateCompleteCheckpointFutureDoesNotBlockEvents() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        CompletableFuture holderFuture = new CompletableFuture();
        holder.checkpointCoordinator(1000L, holderFuture);
        CompletableFuture<byte[]> future1 = OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint();
        holder.abortCurrentTriggering();
        this.triggerAndCompleteCheckpoint(holder, 1010L);
        holder.handleEventFromOperator(0, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(1010L));
        future1.complete(new byte[0]);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(123));
        Assertions.assertThat(tasks.events).containsExactly((Object[])new EventReceivingTasks.EventWithSubtask[]{new EventReceivingTasks.EventWithSubtask(new TestOperatorEvent(123), 0)});
    }

    @Test
    void takeCheckpointAfterSuccessfulCheckpoint() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        this.triggerAndCompleteCheckpoint(holder, 22L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1));
        holder.handleEventFromOperator(0, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(22L));
        holder.handleEventFromOperator(1, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(22L));
        holder.handleEventFromOperator(2, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(22L));
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(2));
        this.triggerAndCompleteCheckpoint(holder, 23L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(3));
        holder.handleEventFromOperator(0, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(23L));
        holder.handleEventFromOperator(1, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(23L));
        holder.handleEventFromOperator(2, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(23L));
        Assertions.assertThat(tasks.getSentEventsForSubtask(0)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(0), new TestOperatorEvent(1), new TestOperatorEvent(2), new TestOperatorEvent(3)});
    }

    @Test
    void takeCheckpointAfterAbortedCheckpoint() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        this.triggerAndCompleteCheckpoint(holder, 22L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(1));
        holder.abortCurrentTriggering();
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(2));
        this.triggerAndCompleteCheckpoint(holder, 23L);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(3));
        holder.handleEventFromOperator(0, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(23L));
        Assertions.assertThat(tasks.getSentEventsForSubtask(0)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(0), new TestOperatorEvent(1), new TestOperatorEvent(2), new TestOperatorEvent(3)});
    }

    @Test
    void testFailingJobMultipleTimesNotCauseCascadingJobFailure() throws Exception {
        Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorProvider = context -> new TestingOperatorCoordinator((OperatorCoordinator.Context)context, (OperatorCoordinator.Context)context){
            final /* synthetic */ OperatorCoordinator.Context val$context;
            {
                this.val$context = context2;
                super(context);
            }

            @Override
            public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
                this.val$context.failJob((Throwable)new RuntimeException("Artificial Exception"));
            }
        };
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, coordinatorProvider);
        holder.handleEventFromOperator(0, 0, (OperatorEvent)new TestOperatorEvent());
        Assertions.assertThat((Throwable)this.globalFailure).isNotNull();
        Throwable firstGlobalFailure = this.globalFailure;
        holder.handleEventFromOperator(1, 0, (OperatorEvent)new TestOperatorEvent());
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)firstGlobalFailure).as("The global failure should be the same instance because the contextshould only take the first request from the coordinator to fail the job.", new Object[0])).isEqualTo((Object)this.globalFailure);
        holder.resetToCheckpoint(0L, new byte[0]);
        holder.handleEventFromOperator(1, 1, (OperatorEvent)new TestOperatorEvent());
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)firstGlobalFailure).as("The new failures should be propagated after the coordinator is reset.", new Object[0])).isNotEqualTo((Object)this.globalFailure);
        this.globalFailure = null;
    }

    @Test
    void checkpointCompletionWaitsForEventFutures() throws Exception {
        CompletableFuture<Acknowledge> ackFuture = new CompletableFuture<Acknowledge>();
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasksWithRpcResult(ackFuture);
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        CompletableFuture<byte[]> checkpointFuture = this.triggerAndCompleteCheckpoint(holder, 22L);
        Assertions.assertThat(checkpointFuture).isNotDone();
        ackFuture.complete(Acknowledge.get());
        Assertions.assertThat(checkpointFuture).isDone();
    }

    @Test
    void verifyCheckpointEventOrderWhenCheckpointFutureCompletedImmediately() throws Exception {
        this.checkpointEventValueAtomicity(FutureCompletedInstantlyTestCoordinator::new);
    }

    @Test
    void verifyCheckpointEventOrderWhenCheckpointFutureCompletesLate() throws Exception {
        this.checkpointEventValueAtomicity(FutureCompletedAfterSendingEventsCoordinator::new);
    }

    private void checkpointEventValueAtomicity(Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
        ManuallyTriggeredScheduledExecutorService executor = new ManuallyTriggeredScheduledExecutorService();
        ComponentMainThreadExecutorServiceAdapter mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(executor, Thread.currentThread());
        EventReceivingTasks sender = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(sender, coordinatorCtor, mainThreadExecutor);
        Thread.sleep(new Random().nextInt(10));
        executor.triggerAll();
        CompletableFuture checkpointFuture = new CompletableFuture();
        holder.checkpointCoordinator(0L, checkpointFuture);
        executor.triggerAll();
        Thread.sleep(new Random().nextInt(10));
        holder.close();
        executor.triggerAll();
        Assertions.assertThat(checkpointFuture).isDone();
        int checkpointedNumber = OperatorCoordinatorHolderTest.bytesToInt((byte[])checkpointFuture.get());
        Assertions.assertThat((int)sender.getNumberOfSentEvents()).isEqualTo(checkpointedNumber);
        for (int i = 0; i < checkpointedNumber; ++i) {
            Assertions.assertThat((int)((TestOperatorEvent)sender.getAllSentEvents().get((int)i).event).getValue()).isEqualTo(i);
        }
    }

    @Test
    void testCheckpointFailsIfSendingEventFailedAfterTrigger() throws Exception {
        CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<Acknowledge>();
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        CompletableFuture checkpointResult = new CompletableFuture();
        holder.checkpointCoordinator(1L, checkpointResult);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
        eventSendingResult.completeExceptionally(new RuntimeException("Artificial"));
        Assertions.assertThat(checkpointResult).isCompletedExceptionally();
    }

    @Test
    void testCheckpointFailsIfSendingEventFailedBeforeTrigger() throws Exception {
        ReorderableManualExecutorService executor = new ReorderableManualExecutorService();
        ComponentMainThreadExecutorServiceAdapter mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(executor, Thread.currentThread());
        CompletableFuture<Acknowledge> eventSendingResult = new CompletableFuture<Acknowledge>();
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasksWithRpcResult(eventSendingResult);
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new, mainThreadExecutor);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        executor.triggerAll();
        executor.setDelayNewRunnables(true);
        eventSendingResult.completeExceptionally(new RuntimeException("Artificial"));
        executor.setDelayNewRunnables(false);
        CompletableFuture checkpointResult = new CompletableFuture();
        holder.checkpointCoordinator(1L, checkpointResult);
        executor.triggerAll();
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
        executor.triggerAll();
        Assertions.assertThat(checkpointResult).isNotDone();
        executor.executeAllDelayedRunnables();
        executor.triggerAll();
        Assertions.assertThat(checkpointResult).isCompletedExceptionally();
    }

    @Test
    void testControlGatewayAtSubtaskGranularity() throws Exception {
        EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
        OperatorCoordinatorHolder holder = this.createCoordinatorHolder(tasks, TestingOperatorCoordinator::new);
        holder.checkpointCoordinator(1L, new CompletableFuture());
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(0).sendEvent((OperatorEvent)new TestOperatorEvent(0));
        OperatorCoordinatorHolderTest.getCoordinator(holder).getSubtaskGateway(1).sendEvent((OperatorEvent)new TestOperatorEvent(1));
        holder.handleEventFromOperator(1, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(1L));
        Assertions.assertThat(tasks.getSentEventsForSubtask(0)).isEmpty();
        Assertions.assertThat(tasks.getSentEventsForSubtask(1)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(1)});
        holder.handleEventFromOperator(0, 0, (OperatorEvent)new AcknowledgeCheckpointEvent(1L));
        Assertions.assertThat(tasks.getSentEventsForSubtask(0)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(0)});
        Assertions.assertThat(tasks.getSentEventsForSubtask(1)).containsExactly((Object[])new OperatorEvent[]{new TestOperatorEvent(1)});
    }

    private CompletableFuture<byte[]> triggerAndCompleteCheckpoint(OperatorCoordinatorHolder holder, long checkpointId) throws Exception {
        CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
        holder.checkpointCoordinator(checkpointId, future);
        OperatorCoordinatorHolderTest.getCoordinator(holder).getLastTriggeredCheckpoint().complete(new byte[0]);
        return future;
    }

    static byte[] intToBytes(int value) {
        return ByteBuffer.allocate(4).putInt(value).array();
    }

    static int bytesToInt(byte[] bytes) {
        return ByteBuffer.wrap(bytes).getInt();
    }

    private static TestingOperatorCoordinator getCoordinator(OperatorCoordinatorHolder holder) {
        return (TestingOperatorCoordinator)holder.coordinator();
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(SubtaskAccess.SubtaskAccessFactory eventTarget, Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor) throws Exception {
        return this.createCoordinatorHolder(eventTarget, coordinatorCtor, ComponentMainThreadExecutorServiceAdapter.forMainThread());
    }

    private OperatorCoordinatorHolder createCoordinatorHolder(SubtaskAccess.SubtaskAccessFactory eventTarget, final Function<OperatorCoordinator.Context, OperatorCoordinator> coordinatorCtor, ComponentMainThreadExecutor mainThreadExecutor) throws Exception {
        final OperatorID opId = new OperatorID();
        OperatorCoordinator.Provider provider = new OperatorCoordinator.Provider(){

            public OperatorID getOperatorId() {
                return opId;
            }

            public OperatorCoordinator create(OperatorCoordinator.Context context) {
                return (OperatorCoordinator)coordinatorCtor.apply(context);
            }
        };
        OperatorCoordinatorHolder holder = OperatorCoordinatorHolder.create((OperatorID)opId, (OperatorCoordinator.Provider)provider, (CoordinatorStore)new CoordinatorStoreImpl(), (String)"test-coordinator-name", (ClassLoader)this.getClass().getClassLoader(), (int)3, (int)1775, (SubtaskAccess.SubtaskAccessFactory)eventTarget, (boolean)false, (TaskInformation)new TaskInformation(new JobVertexID(), "test task", 1, 1, NoOpInvokable.class.getName(), new Configuration()), (JobManagerJobMetricGroup)UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        JobID jobId = new JobID();
        holder.lazyInitialize(this.globalFailureHandler, mainThreadExecutor, new CheckpointCoordinator(jobId, CheckpointCoordinatorConfiguration.builder().build(), Collections.emptyList(), (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(10), (CheckpointStorage)new JobManagerCheckpointStorage(), (Executor)mainThreadExecutor, new CheckpointsCleaner(), (ScheduledExecutor)mainThreadExecutor, new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)NoOpFailJobCall.INSTANCE), (CheckpointPlanCalculator)new DefaultCheckpointPlanCalculator(jobId, new CheckpointPlanCalculatorContext(){

            public ScheduledExecutor getMainExecutor() {
                return null;
            }

            public boolean hasFinishedTasks() {
                return false;
            }
        }, IteratorChain::new, false), (CheckpointStatsTracker)NoOpCheckpointStatsTracker.INSTANCE));
        holder.start();
        return holder;
    }

    private static class ReorderableManualExecutorService
    extends ManuallyTriggeredScheduledExecutorService {
        private boolean delayNewRunnables;
        private final Queue<Runnable> delayedRunnables = new ArrayDeque<Runnable>();

        private ReorderableManualExecutorService() {
        }

        public void setDelayNewRunnables(boolean delayNewRunnables) {
            this.delayNewRunnables = delayNewRunnables;
        }

        @Override
        public void execute(@Nonnull Runnable command) {
            if (this.delayNewRunnables) {
                this.delayedRunnables.add(command);
            } else {
                super.execute(command);
            }
        }

        public void executeAllDelayedRunnables() {
            while (!this.delayedRunnables.isEmpty()) {
                super.execute(this.delayedRunnables.poll());
            }
        }
    }

    private static abstract class CheckpointEventOrderTestBaseCoordinator
    implements OperatorCoordinator,
    Runnable {
        private final Thread coordinatorThread;
        protected final OperatorCoordinator.Context context;
        protected final OperatorCoordinator.SubtaskGateway[] subtaskGateways;
        private volatile boolean closed;

        CheckpointEventOrderTestBaseCoordinator(OperatorCoordinator.Context context) {
            this.context = context;
            this.subtaskGateways = new OperatorCoordinator.SubtaskGateway[context.currentParallelism()];
            this.coordinatorThread = new Thread(this);
        }

        public void start() throws Exception {
        }

        public void close() throws Exception {
            this.closed = true;
            this.coordinatorThread.interrupt();
            this.coordinatorThread.join();
        }

        public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
        }

        public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
        }

        public void subtaskReset(int subtask, long checkpointId) {
        }

        public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
            this.subtaskGateways[subtask] = gateway;
            for (OperatorCoordinator.SubtaskGateway subtaskGateway : this.subtaskGateways) {
                if (subtaskGateway != null) continue;
                return;
            }
            this.coordinatorThread.start();
        }

        public abstract void checkpointCoordinator(long var1, CompletableFuture<byte[]> var3) throws Exception;

        public void notifyCheckpointComplete(long checkpointId) {
        }

        public void resetToCheckpoint(long checkpointId, byte[] checkpointData) throws Exception {
        }

        @Override
        public void run() {
            try {
                while (!this.closed) {
                    this.step();
                }
            }
            catch (Throwable t) {
                if (this.closed) {
                    return;
                }
                t.printStackTrace();
                System.exit(-1);
            }
        }

        protected abstract void step() throws Exception;
    }

    private static final class FutureCompletedAfterSendingEventsCoordinator
    extends CheckpointEventOrderTestBaseCoordinator {
        private final OneShotLatch checkpointCompleted = new OneShotLatch();
        @Nullable
        private volatile CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedAfterSendingEventsCoordinator(OperatorCoordinator.Context context) {
            super(context);
        }

        @Override
        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            this.checkpoint = result;
        }

        @Override
        protected void step() throws Exception {
            Thread.sleep(2L);
            this.subtaskGateways[0].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
            this.subtaskGateways[1].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
            this.subtaskGateways[2].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
            CompletableFuture<byte[]> chkpnt = this.checkpoint;
            if (chkpnt != null) {
                chkpnt.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                this.checkpointCompleted.trigger();
                this.checkpoint = null;
            }
        }

        @Override
        public void close() throws Exception {
            this.checkpointCompleted.await();
            super.close();
        }
    }

    private static final class FutureCompletedInstantlyTestCoordinator
    extends CheckpointEventOrderTestBaseCoordinator {
        private final ReentrantLock lock = new ReentrantLock(true);
        private final Condition condition = this.lock.newCondition();
        @Nullable
        @GuardedBy(value="lock")
        private CompletableFuture<byte[]> checkpoint;
        private int num;

        FutureCompletedInstantlyTestCoordinator(OperatorCoordinator.Context context) {
            super(context);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception {
            this.lock.lock();
            try {
                this.checkpoint = result;
                this.condition.await();
            }
            finally {
                this.lock.unlock();
            }
        }

        @Override
        protected void step() throws Exception {
            this.lock.lock();
            try {
                if (this.checkpoint != null) {
                    this.checkpoint.complete(OperatorCoordinatorHolderTest.intToBytes(this.num));
                    this.checkpoint = null;
                }
                this.subtaskGateways[0].sendEvent((OperatorEvent)new TestOperatorEvent(this.num++));
                this.condition.signalAll();
            }
            finally {
                this.lock.unlock();
            }
            Thread.sleep(2L);
        }
    }
}

