/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.SharedResources;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.BiFunctionWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class TaskAsyncCallTest
extends TestLogger {
    private static int numCalls;
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;
    private ShuffleEnvironment<?, ?> shuffleEnvironment;

    @Before
    public void createQueuesAndActors() {
        numCalls = 1000;
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
        this.shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
    }

    @After
    public void teardown() throws Exception {
        if (this.shuffleEnvironment != null) {
            this.shuffleEnvironment.close();
        }
    }

    @Test
    public void testCheckpointCallsInOrder() throws Exception {
        Task task = this.createTask(CheckpointsInOrderInvokable.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= numCalls; ++i) {
                task.triggerCheckpointBarrier((long)i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
            }
            triggerLatch.await();
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            ExecutionState currentState = task.getExecutionState();
            Assert.assertThat((Object)currentState, (Matcher)Matchers.isOneOf((Object[])new ExecutionState[]{ExecutionState.RUNNING, ExecutionState.FINISHED}));
        }
    }

    @Test
    public void testMixedAsyncCallsInOrder() throws Exception {
        Task task = this.createTask(CheckpointsInOrderInvokable.class);
        try (TaskCleaner ignored = new TaskCleaner(task);){
            task.startTaskThread();
            awaitLatch.await();
            for (int i = 1; i <= numCalls; ++i) {
                task.triggerCheckpointBarrier((long)i, 156865867234L, CheckpointOptions.forCheckpointWithDefaultLocation());
                task.notifyCheckpointComplete((long)i);
            }
            triggerLatch.await();
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            ExecutionState currentState = task.getExecutionState();
            Assert.assertThat((Object)currentState, (Matcher)Matchers.isOneOf((Object[])new ExecutionState[]{ExecutionState.RUNNING, ExecutionState.FINISHED}));
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> invokableClass) throws Exception {
        TestingClassLoaderLease classLoaderHandle = TestingClassLoaderLease.newBuilder().setGetOrResolveClassLoaderFunction((BiFunctionWithException<Collection<PermanentBlobKey>, Collection<URL>, UserCodeClassLoader, IOException>)((BiFunctionWithException)(permanentBlobKeys, urls) -> TestingUserCodeClassLoader.newBuilder().setClassLoader(new TestUserCodeClassLoader()).build())).build();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        Executor executor = (Executor)Mockito.mock(Executor.class);
        TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        JobInformation jobInformation = new JobInformation(new JobID(), JobType.STREAMING, "Job Name", new SerializedValue((Object)new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "Test Task", 1, 1, invokableClass.getName(), new Configuration());
        return new Task(jobInformation, taskInformation, ExecutionGraphTestUtils.createExecutionAttemptId(taskInformation.getJobVertexId()), new AllocationID(), Collections.emptyList(), Collections.emptyList(), (MemoryManager)Mockito.mock(MemoryManager.class), new SharedResources(), (IOManager)Mockito.mock(IOManager.class), this.shuffleEnvironment, new KvStateService(new KvStateRegistry(), null, null), (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, (TaskStateManager)new TestTaskStateManager(), (TaskManagerActions)Mockito.mock(TaskManagerActions.class), (InputSplitProvider)Mockito.mock(InputSplitProvider.class), (CheckpointResponder)Mockito.mock(CheckpointResponder.class), (TaskOperatorEventGateway)new NoOpTaskOperatorEventGateway(), (GlobalAggregateManager)new TestGlobalAggregateManager(), (LibraryCacheManager.ClassLoaderHandle)classLoaderHandle, (FileCache)Mockito.mock(FileCache.class), (TaskManagerRuntimeInfo)new TestingTaskManagerRuntimeInfo(), taskMetricGroup, partitionProducerStateChecker, executor, new ChannelStateWriteRequestExecutorFactory(jobInformation.getJobId()));
    }

    public static class CheckpointsInOrderInvokable
    extends AbstractInvokable {
        private volatile long lastCheckpointId = 0L;
        private volatile Exception error;

        public CheckpointsInOrderInvokable(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            awaitLatch.trigger();
            CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
            synchronized (checkpointsInOrderInvokable) {
                while (this.error == null) {
                    ((Object)((Object)this)).wait();
                }
            }
            if (this.error != null) {
                triggerLatch.trigger();
                throw this.error;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            ++this.lastCheckpointId;
            if (checkpointMetaData.getCheckpointId() == this.lastCheckpointId) {
                if (this.lastCheckpointId == (long)numCalls) {
                    triggerLatch.trigger();
                }
            } else if (this.error == null) {
                this.error = new Exception("calls out of order");
                CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
                synchronized (checkpointsInOrderInvokable) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
            return CompletableFuture.completedFuture(true);
        }

        public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetrics) {
            throw new UnsupportedOperationException("Should not be called");
        }

        public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
            throw new UnsupportedOperationException("Should not be called");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            if (checkpointId != this.lastCheckpointId && this.error == null) {
                this.error = new Exception("calls out of order");
                CheckpointsInOrderInvokable checkpointsInOrderInvokable = this;
                synchronized (checkpointsInOrderInvokable) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    private static class TaskCleaner
    implements AutoCloseable {
        private final Task task;

        private TaskCleaner(Task task) {
            this.task = task;
        }

        @Override
        public void close() throws Exception {
            this.task.cancelExecution();
            this.task.getExecutingThread().join(5000L);
        }
    }

    private static class TestUserCodeClassLoader
    extends ClassLoader {
        TestUserCodeClassLoader() {
            super(ClassLoader.getSystemClassLoader());
        }
    }
}

