/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sorted.state;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceWithAsyncState;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class BatchExecutionInternalTimeServiceWithAsyncStateTest {
    public static final IntSerializer KEY_SERIALIZER = new IntSerializer();
    BatchExecutionKeyedStateBackend<Integer> keyedStatedBackend;
    InternalTimeServiceManager<Integer> timeServiceManager;
    TestProcessingTimeService processingTimeService;
    AsyncExecutionController<Integer> aec;

    BatchExecutionInternalTimeServiceWithAsyncStateTest() {
    }

    @BeforeEach
    public void setup() {
        this.keyedStatedBackend = new BatchExecutionKeyedStateBackend((TypeSerializer)KEY_SERIALIZER, new KeyGroupRange(0, 1), new ExecutionConfig());
        this.processingTimeService = new TestProcessingTimeService();
        this.aec = new AsyncExecutionController((MailboxExecutor)new SyncMailboxExecutor(), (a, b) -> {}, (StateExecutor)new MockStateExecutor(), new DeclarationManager(), 1, 100, 1000L, 1, null, null);
        this.timeServiceManager = BatchExecutionInternalTimeServiceManager.create((TaskIOMetricGroup)UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), (PriorityQueueSetFactory)new AsyncKeyedStateBackendAdaptor(this.keyedStatedBackend), null, (ClassLoader)this.getClass().getClassLoader(), (KeyContext)new DummyKeyContext(), (ProcessingTimeService)this.processingTimeService, Collections.emptyList(), (StreamTaskCancellationContext)StreamTaskCancellationContext.alwaysRunning());
    }

    @Test
    void testForEachEventTimeTimerUnsupported() {
        BatchExecutionInternalTimeServiceWithAsyncState timeService = new BatchExecutionInternalTimeServiceWithAsyncState((ProcessingTimeService)new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(timer -> {}));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> timeService.forEachEventTimeTimer((o, aLong) -> Assertions.fail((String)"The forEachEventTimeTimer() should not be supported"))).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("The BatchExecutionInternalTimeService should not be used in State Processor API");
    }

    @Test
    void testForEachProcessingTimeTimerUnsupported() {
        BatchExecutionInternalTimeServiceWithAsyncState timeService = new BatchExecutionInternalTimeServiceWithAsyncState((ProcessingTimeService)new TestProcessingTimeService(), LambdaTrigger.eventTimeTrigger(timer -> {}));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> timeService.forEachEventTimeTimer((o, aLong) -> Assertions.fail((String)"The forEachProcessingTimeTimer() should not be supported"))).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("The BatchExecutionInternalTimeService should not be used in State Processor API");
    }

    @Test
    void testFiringEventTimeTimers() throws Exception {
        ArrayList timers = new ArrayList();
        InternalTimerService<VoidNamespace> timerService = this.buildTimerService(LambdaTrigger.eventTimeTrigger(timer -> timers.add(timer.getTimestamp())));
        this.keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        this.timeServiceManager.advanceWatermark(new Watermark(1000L));
        timerService.deleteEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        this.keyedStatedBackend.setCurrentKey((Object)2);
        Assertions.assertThat(timers).containsExactly((Object[])new Long[]{150L});
    }

    @Test
    void testSettingSameKeyDoesNotFireTimers() {
        ArrayList timers = new ArrayList();
        InternalTimerService<VoidNamespace> timerService = this.buildTimerService(LambdaTrigger.eventTimeTrigger(timer -> timers.add(timer.getTimestamp())));
        this.keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        this.keyedStatedBackend.setCurrentKey((Object)1);
        Assertions.assertThat(timers).isEmpty();
    }

    @Test
    void testCurrentWatermark() throws Exception {
        ArrayList timers = new ArrayList();
        TriggerWithTimerServiceAccess<Integer, VoidNamespace> eventTimeTrigger = TriggerWithTimerServiceAccess.eventTimeTrigger((timer, timerService) -> {
            Assertions.assertThat((long)timerService.currentWatermark()).isEqualTo(Long.MAX_VALUE);
            timers.add(timer.getTimestamp());
        });
        InternalTimerService<VoidNamespace> timerService2 = this.buildTimerService(eventTimeTrigger);
        eventTimeTrigger.setTimerService(timerService2);
        Assertions.assertThat((long)timerService2.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        this.keyedStatedBackend.setCurrentKey((Object)1);
        timerService2.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 123L);
        Assertions.assertThat((long)timerService2.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        this.timeServiceManager.advanceWatermark(new Watermark(1000L));
        Assertions.assertThat((long)timerService2.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        this.keyedStatedBackend.setCurrentKey((Object)2);
        Assertions.assertThat((long)timerService2.currentWatermark()).isEqualTo(Long.MIN_VALUE);
        timerService2.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 124L);
        this.timeServiceManager.advanceWatermark(Watermark.MAX_WATERMARK);
        Assertions.assertThat(timers).containsExactly((Object[])new Long[]{123L, 124L});
    }

    @Test
    void testProcessingTimeTimers() {
        ArrayList timers = new ArrayList();
        InternalTimerService<VoidNamespace> timerService = this.buildTimerService(LambdaTrigger.processingTimeTrigger(timer -> timers.add(timer.getTimestamp())));
        this.keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        Assertions.assertThat((int)this.processingTimeService.getNumActiveTimers()).isZero();
        this.keyedStatedBackend.setCurrentKey((Object)2);
        Assertions.assertThat(timers).containsExactly((Object[])new Long[]{150L});
    }

    @Test
    void testIgnoringEventTimeTimersFromWithinCallback() {
        ArrayList timers = new ArrayList();
        TriggerWithTimerServiceAccess<Integer, VoidNamespace> trigger = TriggerWithTimerServiceAccess.eventTimeTrigger((timer, ts) -> {
            timers.add(timer.getTimestamp());
            ts.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, timer.getTimestamp() + 20L);
        });
        InternalTimerService<VoidNamespace> timerService = this.buildTimerService(trigger);
        trigger.setTimerService(timerService);
        this.keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerEventTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        Assertions.assertThat((int)this.processingTimeService.getNumActiveTimers()).isZero();
        this.keyedStatedBackend.setCurrentKey((Object)2);
        Assertions.assertThat(timers).containsExactly((Object[])new Long[]{150L});
    }

    @Test
    void testIgnoringProcessingTimeTimersFromWithinCallback() {
        ArrayList timers = new ArrayList();
        TriggerWithTimerServiceAccess<Integer, VoidNamespace> trigger = TriggerWithTimerServiceAccess.processingTimeTrigger((timer, ts) -> {
            timers.add(timer.getTimestamp());
            ts.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, timer.getTimestamp() + 20L);
        });
        InternalTimerService<VoidNamespace> timerService = this.buildTimerService(trigger);
        trigger.setTimerService(timerService);
        this.keyedStatedBackend.setCurrentKey((Object)1);
        timerService.registerProcessingTimeTimer((Object)VoidNamespace.INSTANCE, 150L);
        Assertions.assertThat((int)this.processingTimeService.getNumActiveTimers()).isZero();
        this.keyedStatedBackend.setCurrentKey((Object)2);
        Assertions.assertThat(timers).containsExactly((Object[])new Long[]{150L});
    }

    private InternalTimerService<VoidNamespace> buildTimerService(Triggerable<Integer, VoidNamespace> trigger) {
        InternalTimerService timerService = this.timeServiceManager.getInternalTimerService("test", (TypeSerializer)KEY_SERIALIZER, (TypeSerializer)new VoidNamespaceSerializer(), trigger);
        ((BatchExecutionInternalTimeServiceWithAsyncState)timerService).setup(this.aec);
        return timerService;
    }

    private static class DummyKeyContext
    implements KeyContext {
        private DummyKeyContext() {
        }

        public void setCurrentKey(Object key) {
        }

        public Object getCurrentKey() {
            return null;
        }
    }

    private static class LambdaTrigger<K, N>
    implements Triggerable<K, N> {
        private final Consumer<InternalTimer<K, N>> eventTimeHandler;
        private final Consumer<InternalTimer<K, N>> processingTimeHandler;

        public static <K, N> LambdaTrigger<K, N> eventTimeTrigger(Consumer<InternalTimer<K, N>> eventTimeHandler) {
            return new LambdaTrigger<K, N>(eventTimeHandler, timer -> Assertions.fail((String)"We did not expect processing timer to be triggered."));
        }

        public static <K, N> LambdaTrigger<K, N> processingTimeTrigger(Consumer<InternalTimer<K, N>> processingTimeHandler) {
            return new LambdaTrigger<K, N>(timer -> Assertions.fail((String)"We did not expect event timer to be triggered."), processingTimeHandler);
        }

        private LambdaTrigger(Consumer<InternalTimer<K, N>> eventTimeHandler, Consumer<InternalTimer<K, N>> processingTimeHandler) {
            this.eventTimeHandler = eventTimeHandler;
            this.processingTimeHandler = processingTimeHandler;
        }

        public void onEventTime(InternalTimer<K, N> timer) throws Exception {
            this.eventTimeHandler.accept(timer);
        }

        public void onProcessingTime(InternalTimer<K, N> timer) throws Exception {
            this.processingTimeHandler.accept(timer);
        }
    }

    private static class TriggerWithTimerServiceAccess<K, N>
    implements Triggerable<K, N> {
        private InternalTimerService<N> timerService;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler;
        private final BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler;

        private TriggerWithTimerServiceAccess(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler, BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler) {
            this.eventTimeHandler = eventTimeHandler;
            this.processingTimeHandler = processingTimeHandler;
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> eventTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> eventTimeHandler) {
            return new TriggerWithTimerServiceAccess<K, N>(eventTimeHandler, (timer, timeService) -> Assertions.fail((String)"We did not expect processing timer to be triggered."));
        }

        public static <K, N> TriggerWithTimerServiceAccess<K, N> processingTimeTrigger(BiConsumer<InternalTimer<K, N>, InternalTimerService<N>> processingTimeHandler) {
            return new TriggerWithTimerServiceAccess<K, N>((timer, timeService) -> Assertions.fail((String)"We did not expect event timer to be triggered."), processingTimeHandler);
        }

        public void setTimerService(InternalTimerService<N> timerService) {
            this.timerService = timerService;
        }

        public void onEventTime(InternalTimer<K, N> timer) throws Exception {
            this.eventTimeHandler.accept(timer, this.timerService);
        }

        public void onProcessingTime(InternalTimer<K, N> timer) throws Exception {
            this.processingTimeHandler.accept(timer, this.timerService);
        }
    }
}

