/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.SharedResources;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(value=10L, unit=TimeUnit.SECONDS)
class SynchronousCheckpointITCase {
    private static final LinkedBlockingQueue<Event> EVENT_QUEUE = new LinkedBlockingQueue();

    SynchronousCheckpointITCase() {
    }

    @Test
    void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() throws Exception {
        Task task = this.createTask(SynchronousCheckpointTestingTask.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            Assertions.assertThat((Comparable)((Object)EVENT_QUEUE.take())).isEqualTo((Object)Event.TASK_IS_RUNNING);
            Assertions.assertThat(EVENT_QUEUE).isEmpty();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.RUNNING);
            task.triggerCheckpointBarrier(42L, 156865867234L, new CheckpointOptions((SnapshotType)SavepointType.suspend((SavepointFormatType)SavepointFormatType.CANONICAL), CheckpointStorageLocationReference.getDefault()));
            Assertions.assertThat((Comparable)((Object)EVENT_QUEUE.take())).isEqualTo((Object)Event.PRE_TRIGGER_CHECKPOINT);
            Assertions.assertThat((Comparable)((Object)EVENT_QUEUE.take())).isEqualTo((Object)Event.POST_TRIGGER_CHECKPOINT);
            Assertions.assertThat(EVENT_QUEUE).isEmpty();
            task.notifyCheckpointComplete(42L);
            Assertions.assertThat((Comparable)((Object)EVENT_QUEUE.take())).isEqualTo((Object)Event.PRE_NOTIFY_CHECKPOINT_COMPLETE);
            Assertions.assertThat((Comparable)((Object)EVENT_QUEUE.take())).isEqualTo((Object)Event.POST_NOTIFY_CHECKPOINT_COMPLETE);
            Assertions.assertThat(EVENT_QUEUE).isEmpty();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.RUNNING);
        }
    }

    private Task createTask(Class<? extends TaskInvokable> invokableClass) throws Exception {
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        JobInformation jobInformation = new JobInformation(new JobID(), JobType.STREAMING, "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, invokableClass.getName(), new Configuration());
        return new Task(jobInformation, taskInformation, ExecutionGraphTestUtils.createExecutionAttemptId(taskInformation.getJobVertexId()), new AllocationID(), Collections.emptyList(), Collections.emptyList(), (MemoryManager)Mockito.mock(MemoryManager.class), new SharedResources(), (IOManager)Mockito.mock(IOManager.class), (ShuffleEnvironment)shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, (TaskStateManager)new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), (TaskOperatorEventGateway)new NoOpTaskOperatorEventGateway(), (GlobalAggregateManager)new TestGlobalAggregateManager(), (LibraryCacheManager.ClassLoaderHandle)TestingClassLoaderLease.newBuilder().build(), (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), taskMetricGroup, partitionProducerStateChecker, executor, new ChannelStateWriteRequestExecutorFactory(jobInformation.getJobId()));
    }

    public static class SynchronousCheckpointTestingTask
    extends StreamTask {
        private boolean isRunning;

        public SynchronousCheckpointTestingTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            if (!this.isRunning) {
                this.isRunning = true;
                EVENT_QUEUE.put(Event.TASK_IS_RUNNING);
            }
            if (this.isCanceled()) {
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
            } else {
                controller.suspendDefaultAction();
            }
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            try {
                EVENT_QUEUE.put(Event.PRE_TRIGGER_CHECKPOINT);
                CompletableFuture result = super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
                EVENT_QUEUE.put(Event.POST_TRIGGER_CHECKPOINT);
                return result;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            try {
                EVENT_QUEUE.put(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE);
                Future result = super.notifyCheckpointCompleteAsync(checkpointId);
                EVENT_QUEUE.put(Event.POST_NOTIFY_CHECKPOINT_COMPLETE);
                return result;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        public Future<Void> notifyCheckpointAbortAsync(long checkpointId, long latestCompletedCheckpointId) {
            return CompletableFuture.completedFuture(null);
        }

        protected void init() {
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) {
            throw new UnsupportedOperationException("Should not be called");
        }

        public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
            throw new UnsupportedOperationException("Should not be called");
        }

        protected void cleanUpInternal() {
        }
    }

    private static class TaskCleaner
    implements AutoCloseable {
        private final Task task;

        private TaskCleaner(Task task) {
            this.task = task;
        }

        @Override
        public void close() throws Exception {
            this.task.cancelExecution();
            this.task.getExecutingThread().join(5000L);
        }
    }

    private static enum Event {
        TASK_IS_RUNNING,
        PRE_TRIGGER_CHECKPOINT,
        PRE_NOTIFY_CHECKPOINT_COMPLETE,
        POST_NOTIFY_CHECKPOINT_COMPLETE,
        POST_TRIGGER_CHECKPOINT;

    }
}

