/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceImpl;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class StreamOperatorWrapperTest {
    private static SystemProcessingTimeService timerService;
    private static final int numOperators = 3;
    private List<StreamOperatorWrapper<?, ?>> operatorWrappers;
    private ConcurrentLinkedQueue<Object> output;
    private volatile StreamTask<?, ?> containingTask;

    StreamOperatorWrapperTest() {
    }

    @BeforeAll
    static void startTimeService() {
        CompletableFuture errorFuture = new CompletableFuture();
        timerService = new SystemProcessingTimeService(errorFuture::complete);
    }

    @AfterAll
    static void shutdownTimeService() {
        timerService.shutdownService();
    }

    @BeforeEach
    void setup() throws Exception {
        this.operatorWrappers = new ArrayList();
        this.output = new ConcurrentLinkedQueue();
        try (MockEnvironment env = MockEnvironment.builder().build();){
            this.containingTask = new MockStreamTaskBuilder(env).build();
            for (int i = 0; i < 3; ++i) {
                MailboxExecutor mailboxExecutor = this.containingTask.getMailboxExecutorFactory().createExecutor(i);
                TimerMailController timerMailController = new TimerMailController(this.containingTask, mailboxExecutor);
                ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)timerService, timerMailController::wrapCallback);
                TestOneInputStreamOperator streamOperator = new TestOneInputStreamOperator("Operator" + i, this.output, (ProcessingTimeService)processingTimeService, mailboxExecutor, timerMailController);
                StreamOperatorWrapper operatorWrapper = new StreamOperatorWrapper((StreamOperator)streamOperator, Optional.ofNullable(streamOperator.getProcessingTimeService()), mailboxExecutor, i == 0);
                this.operatorWrappers.add(operatorWrapper);
            }
            StreamOperatorWrapper<?, ?> previous = null;
            for (StreamOperatorWrapper<?, ?> current : this.operatorWrappers) {
                if (previous != null) {
                    previous.setNext(current);
                }
                current.setPrevious(previous);
                previous = current;
            }
        }
    }

    @AfterEach
    void teardown() throws Exception {
        this.containingTask.cleanUpInternal();
    }

    @Test
    void testFinish() throws Exception {
        this.output.clear();
        this.operatorWrappers.get(0).finish(this.containingTask.getActionExecutor(), StopMode.DRAIN);
        ArrayList expected = new ArrayList();
        for (int i = 0; i < this.operatorWrappers.size(); ++i) {
            String prefix = "[Operator" + i + "]";
            Collections.addAll(expected, prefix + ": End of input", prefix + ": Timer that was in mailbox before closing operator", prefix + ": Bye", prefix + ": Mail to put in mailbox when finishing operator");
        }
        ((AbstractCollectionAssert)Assertions.assertThat(this.output).as("Output was not correct.", new Object[0])).containsExactlyElementsOf(expected.subList(2, expected.size()));
    }

    @Test
    void testFinishingOperatorWithException() {
        AbstractStreamOperator<Void> streamOperator = new AbstractStreamOperator<Void>(){

            public void finish() throws Exception {
                throw new Exception("test exception at finishing");
            }
        };
        StreamOperatorWrapper operatorWrapper = new StreamOperatorWrapper((StreamOperator)streamOperator, Optional.ofNullable(streamOperator.getProcessingTimeService()), this.containingTask.getMailboxExecutorFactory().createExecutor(0x7FFFFFFE), true);
        Assertions.assertThatThrownBy(() -> operatorWrapper.finish(this.containingTask.getActionExecutor(), StopMode.DRAIN)).hasMessageContaining("test exception at finishing");
    }

    @Test
    void testReadIterator() {
        TestOneInputStreamOperator operator;
        StreamOperatorWrapper next;
        int i;
        StreamOperatorWrapper.ReadIterator it = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(0), false);
        for (i = 0; i < this.operatorWrappers.size(); ++i) {
            Assertions.assertThat((Iterator)it).hasNext();
            next = (StreamOperatorWrapper)it.next();
            Assertions.assertThat((Object)next).isNotNull();
            operator = this.getStreamOperatorFromWrapper(next);
            Assertions.assertThat((String)operator.getName()).isEqualTo("Operator" + i);
        }
        Assertions.assertThat((Iterator)it).isExhausted();
        it = new StreamOperatorWrapper.ReadIterator(this.operatorWrappers.get(this.operatorWrappers.size() - 1), true);
        for (i = this.operatorWrappers.size() - 1; i >= 0; --i) {
            Assertions.assertThat((Iterator)it).hasNext();
            next = (StreamOperatorWrapper)it.next();
            Assertions.assertThat((Object)next).isNotNull();
            operator = this.getStreamOperatorFromWrapper(next);
            Assertions.assertThat((String)operator.getName()).isEqualTo("Operator" + i);
        }
        Assertions.assertThat((Iterator)it).isExhausted();
    }

    private TestOneInputStreamOperator getStreamOperatorFromWrapper(StreamOperatorWrapper<?, ?> operatorWrapper) {
        return (TestOneInputStreamOperator)Objects.requireNonNull(operatorWrapper.getStreamOperator());
    }

    private static class TimerMailController {
        private final StreamTask<?, ?> containingTask;
        private final MailboxExecutor mailboxExecutor;
        private final ConcurrentHashMap<ProcessingTimeService.ProcessingTimeCallback, OneShotLatch> puttingLatches;
        private final ConcurrentHashMap<ProcessingTimeService.ProcessingTimeCallback, OneShotLatch> inMailboxLatches;

        TimerMailController(StreamTask<?, ?> containingTask, MailboxExecutor mailboxExecutor) {
            this.containingTask = containingTask;
            this.mailboxExecutor = mailboxExecutor;
            this.puttingLatches = new ConcurrentHashMap();
            this.inMailboxLatches = new ConcurrentHashMap();
        }

        OneShotLatch getPuttingLatch(ProcessingTimeService.ProcessingTimeCallback callback) {
            return this.puttingLatches.get(callback);
        }

        OneShotLatch getInMailboxLatch(ProcessingTimeService.ProcessingTimeCallback callback) {
            return this.inMailboxLatches.get(callback);
        }

        ProcessingTimeService.ProcessingTimeCallback wrapCallback(ProcessingTimeService.ProcessingTimeCallback callback) {
            this.puttingLatches.put(callback, new OneShotLatch());
            this.inMailboxLatches.put(callback, new OneShotLatch());
            return timestamp -> {
                this.puttingLatches.get(callback).trigger();
                this.containingTask.deferCallbackToMailbox(this.mailboxExecutor, callback).onProcessingTime(timestamp);
                this.inMailboxLatches.get(callback).trigger();
            };
        }
    }

    private static class TestOneInputStreamOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String>,
    BoundedOneInput {
        private static final long serialVersionUID = 1L;
        private final String name;
        private final ConcurrentLinkedQueue<Object> output;
        private final ProcessingTimeService processingTimeService;
        private final MailboxExecutor mailboxExecutor;
        private final TimerMailController timerMailController;

        TestOneInputStreamOperator(String name, ConcurrentLinkedQueue<Object> output, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor, TimerMailController timerMailController) {
            this.name = name;
            this.output = output;
            this.processingTimeService = processingTimeService;
            this.mailboxExecutor = mailboxExecutor;
            this.timerMailController = timerMailController;
            processingTimeService.registerTimer(Long.MAX_VALUE, t2 -> output.add("[" + name + "]: Timer not triggered"));
            super.setProcessingTimeService(processingTimeService);
        }

        public String getName() {
            return this.name;
        }

        public void processElement(StreamRecord<String> element) {
        }

        public void endInput() throws InterruptedException {
            this.output.add("[" + this.name + "]: End of input");
            ProcessingTimeService.ProcessingTimeCallback callback = t1 -> this.output.add("[" + this.name + "]: Timer that was in mailbox before closing operator");
            this.processingTimeService.registerTimer(0L, callback);
            this.timerMailController.getInMailboxLatch(callback).await();
        }

        public void finish() throws Exception {
            ProcessingTimeService.ProcessingTimeCallback callback = t1 -> this.output.add("[" + this.name + "]: Timer to put in mailbox when finishing operator");
            Assertions.assertThat((Future)this.processingTimeService.registerTimer(0L, callback)).isNotNull();
            Assertions.assertThat((Object)this.timerMailController.getPuttingLatch(callback)).isNull();
            this.mailboxExecutor.execute(() -> this.output.add("[" + this.name + "]: Mail to put in mailbox when finishing operator"), "");
            this.output.add("[" + this.name + "]: Bye");
        }
    }
}

