/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class StreamTaskExecutionDecorationTest {
    private CountingStreamTaskActionExecutor decorator;
    private StreamTask<Object, StreamOperator<Object>> task;
    private TaskMailboxImpl mailbox;

    StreamTaskExecutionDecorationTest() {
    }

    @Test
    void testAbortCheckpointOnBarrierIsDecorated() throws Exception {
        this.task.abortCheckpointOnBarrier(1L, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @Test
    void testTriggerCheckpointOnBarrierIsDecorated() throws Exception {
        this.task.triggerCheckpointOnBarrier(new CheckpointMetaData(1L, 2L), new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, new CheckpointStorageLocationReference(new byte[]{1})), null);
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @Test
    void testTriggerCheckpointAsyncIsDecorated() {
        this.task.triggerCheckpointAsync(new CheckpointMetaData(1L, 2L), new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, new CheckpointStorageLocationReference(new byte[]{1})));
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.mailbox.hasMail()).as("mailbox is empty", new Object[0])).isTrue();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.decorator.wasCalled()).as("execution decorator was called preliminary", new Object[0])).isFalse();
        this.mailbox.drain().forEach(m -> {
            try {
                m.run();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @Test
    void testMailboxExecutorIsDecorated() throws Exception {
        this.task.mailboxProcessor.getMainMailboxExecutor().execute(() -> this.task.mailboxProcessor.allActionsCompleted(), "");
        this.task.mailboxProcessor.runMailboxLoop();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.decorator.wasCalled()).as("execution decorator was not called", new Object[0])).isTrue();
    }

    @BeforeEach
    void before() throws Exception {
        this.mailbox = new TaskMailboxImpl();
        this.decorator = new CountingStreamTaskActionExecutor();
        this.task = new StreamTask<Object, StreamOperator<Object>>((Environment)new DeclineDummyEnvironment(), null, (Thread.UncaughtExceptionHandler)FatalExitExceptionHandler.INSTANCE, (StreamTaskActionExecutor)this.decorator, (TaskMailbox)this.mailbox){

            protected void init() {
            }

            protected void processInput(MailboxDefaultAction.Controller controller) {
            }
        };
        this.task.operatorChain = new RegularOperatorChain(this.task, (RecordWriterDelegate)new NonRecordWriter());
    }

    @AfterEach
    void after() {
        this.decorator = null;
        this.task = null;
    }

    static class CountingStreamTaskActionExecutor
    extends StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor {
        private final AtomicInteger calls = new AtomicInteger(0);

        CountingStreamTaskActionExecutor() {
            super(new Object());
        }

        int getCallCount() {
            return this.calls.get();
        }

        boolean wasCalled() {
            return this.getCallCount() > 0;
        }

        public void run(RunnableWithException runnable) throws Exception {
            this.calls.incrementAndGet();
            runnable.run();
        }

        public <E extends Throwable> void runThrowing(ThrowingRunnable<E> runnable) throws E {
            this.calls.incrementAndGet();
            runnable.run();
        }

        public <R> R call(Callable<R> callable) throws Exception {
            this.calls.incrementAndGet();
            return callable.call();
        }
    }

    private static final class DeclineDummyEnvironment
    extends DummyEnvironment {
        DeclineDummyEnvironment() {
            super("test", 1, 0);
        }

        @Override
        public void declineCheckpoint(long checkpointId, CheckpointException cause) {
        }
    }
}

