/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

@ExtendWith(value={TestLoggerExtension.class})
public class TaskCancelAsyncProducerConsumerITCase {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
    private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
    private static volatile Thread ASYNC_PRODUCER_THREAD;
    private static volatile Thread ASYNC_CONSUMER_THREAD;
    @RegisterExtension
    private static final InternalMiniClusterExtension MINI_CLUSTER_RESOURCE;

    private static Configuration getFlinkConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"4096"));
        return config;
    }

    @Test
    public void testCancelAsyncProducerAndConsumer(@InjectMiniCluster MiniCluster flink) throws Exception {
        Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2L));
        JobVertex producer = new JobVertex("AsyncProducer");
        producer.setParallelism(1);
        producer.setInvokableClass(AsyncProducer.class);
        JobVertex consumer = new JobVertex("AsyncConsumer");
        consumer.setParallelism(1);
        consumer.setInvokableClass(AsyncConsumer.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(consumer, producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slot = new SlotSharingGroup();
        producer.setSlotSharingGroup(slot);
        consumer.setSlotSharingGroup(slot);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(producer, consumer);
        flink.runDetached(jobGraph);
        FutureUtils.retrySuccessfulWithDelay(() -> flink.getJobStatus(jobGraph.getJobID()), (Duration)Duration.ofMillis(10L), (Deadline)deadline, status -> status == JobStatus.RUNNING, (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        boolean producerBlocked = false;
        for (int i = 0; i < 50; ++i) {
            Thread thread = ASYNC_PRODUCER_THREAD;
            if (thread != null && thread.isAlive()) {
                StackTraceElement[] stackTrace = thread.getStackTrace();
                producerBlocked = BufferBuilderTestUtils.isInBlockingBufferRequest(stackTrace);
            }
            if (producerBlocked) break;
            Thread.sleep(500L);
        }
        Assert.assertTrue((String)("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace())), (boolean)producerBlocked);
        boolean consumerWaiting = false;
        for (int i = 0; i < 50; ++i) {
            Thread thread = ASYNC_CONSUMER_THREAD;
            if (thread != null && thread.isAlive()) {
                boolean bl = consumerWaiting = thread.getState() == Thread.State.WAITING;
            }
            if (consumerWaiting) break;
            Thread.sleep(500L);
        }
        Assert.assertTrue((String)"Consumer thread is not blocked.", (boolean)consumerWaiting);
        flink.cancelJob(jobGraph.getJobID()).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        FutureUtils.retrySuccessfulWithDelay(() -> flink.getJobStatus(jobGraph.getJobID()), (Duration)Duration.ofMillis(10L), (Deadline)deadline, status -> status == JobStatus.CANCELED, (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor())).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        Assert.assertNotNull((Object)ASYNC_PRODUCER_EXCEPTION);
        Assert.assertEquals(CancelTaskException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
        Assert.assertNotNull((Object)ASYNC_CONSUMER_EXCEPTION);
        Assert.assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
    }

    static {
        MINI_CLUSTER_RESOURCE = new InternalMiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(TaskCancelAsyncProducerConsumerITCase.getFlinkConfiguration()).build());
    }

    public static class AsyncProducer
    extends AbstractInvokable {
        public AsyncProducer(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            ProducerThread producer = new ProducerThread(this.getEnvironment().getWriter(0));
            ASYNC_PRODUCER_THREAD = producer;
            producer.start();
            while (producer.isAlive()) {
                try {
                    producer.join();
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        private static class ProducerThread
        extends Thread {
            private final RecordWriter<LongValue> recordWriter;

            public ProducerThread(ResultPartitionWriter partitionWriter) {
                this.recordWriter = new RecordWriterBuilder().build(partitionWriter);
            }

            @Override
            public void run() {
                LongValue current = new LongValue(0L);
                try {
                    while (true) {
                        current.setValue(current.getValue() + 1L);
                        this.recordWriter.emit((IOReadableWritable)current);
                        this.recordWriter.flushAll();
                    }
                }
                catch (Exception e) {
                    ASYNC_PRODUCER_EXCEPTION = e;
                    return;
                }
            }
        }
    }

    public static class AsyncConsumer
    extends AbstractInvokable {
        public AsyncConsumer(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            ConsumerThread consumer = new ConsumerThread((InputGate)this.getEnvironment().getInputGate(0));
            ASYNC_CONSUMER_THREAD = consumer;
            consumer.start();
            while (consumer.isAlive()) {
                try {
                    consumer.join();
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        private static class ConsumerThread
        extends Thread {
            private final InputGate inputGate;

            public ConsumerThread(InputGate inputGate) {
                this.inputGate = inputGate;
            }

            @Override
            public void run() {
                try {
                    while (true) {
                        this.inputGate.getNext();
                    }
                }
                catch (Exception e) {
                    ASYNC_CONSUMER_EXCEPTION = e;
                    return;
                }
            }
        }
    }
}

