/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshot;
import org.apache.flink.streaming.api.operators.InternalTimersSnapshotReaderWriters;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.operators.TimerSerializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={ParameterizedTestExtension.class})
class InternalTimerServiceImplTest {
    private final int maxParallelism;
    private final KeyGroupRange testKeyGroupRange;

    private static InternalTimer<Integer, String> anyInternalTimer() {
        return (InternalTimer)Mockito.any();
    }

    InternalTimerServiceImplTest(int startKeyGroup, int endKeyGroup, int maxParallelism) {
        this.testKeyGroupRange = new KeyGroupRange(startKeyGroup, endKeyGroup);
        this.maxParallelism = maxParallelism;
    }

    @TestTemplate
    void testKeyGroupStartIndexSetting() {
        int startKeyGroupIdx = 7;
        int endKeyGroupIdx = 21;
        KeyGroupRange testKeyGroupList = new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl service = InternalTimerServiceImplTest.createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), testKeyGroupList, keyContext, (ProcessingTimeService)processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, this.createQueueFactory());
        Assertions.assertThat((int)service.getLocalKeyGroupRangeStartIdx()).isEqualTo(startKeyGroupIdx);
    }

    @TestTemplate
    void testTimerAssignmentToKeyGroups() {
        int totalNoOfTimers = 100;
        int totalNoOfKeyGroups = 100;
        int startKeyGroupIdx = 0;
        int endKeyGroupIdx = totalNoOfKeyGroups - 1;
        HashSet[] expectedNonEmptyTimerSets = new HashSet[totalNoOfKeyGroups];
        TestKeyContext keyContext = new TestKeyContext();
        KeyGroupRange keyGroupRange = new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx);
        PriorityQueueSetFactory priorityQueueSetFactory = this.createQueueFactory(keyGroupRange, totalNoOfKeyGroups);
        InternalTimerServiceImpl timerService = InternalTimerServiceImplTest.createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), keyGroupRange, keyContext, (ProcessingTimeService)new TestProcessingTimeService(), IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        timerService.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, (Triggerable)Mockito.mock(Triggerable.class));
        for (int i = 0; i < totalNoOfTimers; ++i) {
            TimerHeapInternalTimer timer = new TimerHeapInternalTimer((long)(10 + i), (Object)i, (Object)("hello_world_" + i));
            int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup((Object)timer.getKey(), (int)totalNoOfKeyGroups);
            HashSet<TimerHeapInternalTimer> timerSet = expectedNonEmptyTimerSets[keyGroupIdx];
            if (timerSet == null) {
                expectedNonEmptyTimerSets[keyGroupIdx] = timerSet = new HashSet<TimerHeapInternalTimer>();
            }
            timerSet.add(timer);
            keyContext.setCurrentKey(timer.getKey());
            timerService.registerEventTimeTimer((Object)((String)timer.getNamespace()), timer.getTimestamp());
            timerService.registerProcessingTimeTimer((Object)((String)timer.getNamespace()), timer.getTimestamp());
        }
        List eventTimeTimers = timerService.getEventTimeTimersPerKeyGroup();
        List processingTimeTimers = timerService.getProcessingTimeTimersPerKeyGroup();
        for (int i = 0; i < expectedNonEmptyTimerSets.length; ++i) {
            HashSet expected = expectedNonEmptyTimerSets[i];
            Set actualEvent = (Set)eventTimeTimers.get(i);
            Set actualProcessing = (Set)processingTimeTimers.get(i);
            if (expected == null) {
                Assertions.assertThat((Collection)actualEvent).isEmpty();
                Assertions.assertThat((Collection)actualProcessing).isEmpty();
                continue;
            }
            Assertions.assertThat((Collection)actualEvent).isEqualTo((Object)expected);
            Assertions.assertThat((Collection)actualProcessing).isEqualTo((Object)expected);
        }
    }

    @TestTemplate
    void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        HeapPriorityQueueSetFactory priorityQueueSetFactory = new HeapPriorityQueueSetFactory(this.testKeyGroupRange, this.maxParallelism, 128);
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, (PriorityQueueSetFactory)priorityQueueSetFactory);
        int key = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        keyContext.setCurrentKey(key);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"ciao", 20L);
        timerService.registerProcessingTimeTimer((Object)"ciao", 30L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 20L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(5);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isEqualTo(2);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isEqualTo(3);
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{10L});
        processingTimeService.setCurrentTime(10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(3);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isEqualTo(2);
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{20L});
        processingTimeService.setCurrentTime(20L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isZero();
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isOne();
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{30L});
        processingTimeService.setCurrentTime(30L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isZero();
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isZero();
        timerService.registerProcessingTimeTimer((Object)"ciao", 40L);
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
    }

    @TestTemplate
    void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        int key = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        keyContext.setCurrentKey(key);
        timerService.registerProcessingTimeTimer((Object)"ciao", 20L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{20L});
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(2L);
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{10L});
    }

    @TestTemplate
    void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        final InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        int key = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        keyContext.setCurrentKey(key);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{10L});
        ((Triggerable)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                timerService.registerProcessingTimeTimer((Object)"ciao", 20L);
                return null;
            }
        }).when((Object)mockTriggerable)).onProcessingTime(InternalTimerServiceImplTest.anyInternalTimer());
        processingTimeService.setCurrentTime(10L);
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{20L});
        ((Triggerable)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                timerService.registerProcessingTimeTimer((Object)"ciao", 30L);
                return null;
            }
        }).when((Object)mockTriggerable)).onProcessingTime(InternalTimerServiceImplTest.anyInternalTimer());
        processingTimeService.setCurrentTime(20L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isOne();
        Assertions.assertThat((int)processingTimeService.getNumActiveTimers()).isOne();
        Assertions.assertThat(processingTimeService.getActiveTimerTimestamps()).contains((Object[])new Long[]{30L});
    }

    @TestTemplate
    void testCurrentProcessingTime() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        processingTimeService.setCurrentTime(17L);
        Assertions.assertThat((long)timerService.currentProcessingTime()).isEqualTo(17L);
        processingTimeService.setCurrentTime(42L);
        Assertions.assertThat((long)timerService.currentProcessingTime()).isEqualTo(42L);
    }

    @TestTemplate
    void testCurrentEventTime() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        timerService.advanceWatermark(17L);
        Assertions.assertThat((long)timerService.currentWatermark()).isEqualTo(17L);
        timerService.advanceWatermark(42L);
        Assertions.assertThat((long)timerService.currentWatermark()).isEqualTo(42L);
    }

    @TestTemplate
    void testSetAndFireEventTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        PriorityQueueSetFactory priorityQueueSetFactory = this.createQueueFactory();
        TaskIOMetricGroup taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        InternalTimerServiceImpl service = InternalTimerServiceImplTest.createInternalTimerService(taskIOMetricGroup, this.testKeyGroupRange, keyContext, (ProcessingTimeService)processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, mockTriggerable);
        InternalTimerServiceImpl timerService = service;
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isEqualTo(4);
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"hello")).isEqualTo(2);
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"ciao")).isEqualTo(2);
        timerService.advanceWatermark(10L);
        Assertions.assertThat((long)taskIOMetricGroup.getNumFiredTimers().getCount()).isEqualTo(4L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)4))).onEventTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"hello")));
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isZero();
    }

    @TestTemplate
    void testSetAndFireProcessingTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        PriorityQueueSetFactory priorityQueueSetFactory = this.createQueueFactory();
        TaskIOMetricGroup taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        InternalTimerServiceImpl service = InternalTimerServiceImplTest.createInternalTimerService(taskIOMetricGroup, this.testKeyGroupRange, keyContext, (ProcessingTimeService)processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, mockTriggerable);
        InternalTimerServiceImpl timerService = service;
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(4L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isEqualTo(2);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isEqualTo(2);
        processingTimeService.setCurrentTime(10L);
        Assertions.assertThat((long)taskIOMetricGroup.getNumFiredTimers().getCount()).isEqualTo(4L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)4))).onProcessingTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"hello")));
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isZero();
    }

    @TestTemplate
    void testDeleteEventTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isEqualTo(4);
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"hello")).isEqualTo(2);
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"ciao")).isEqualTo(2);
        keyContext.setCurrentKey(key1);
        timerService.deleteEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.deleteEventTimeTimer((Object)"ciao", 10L);
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"ciao")).isOne();
        timerService.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)2))).onEventTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"hello")));
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isZero();
    }

    @TestTemplate
    void testDeleteProcessingTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(4);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isEqualTo(2);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isEqualTo(2);
        keyContext.setCurrentKey(key1);
        timerService.deleteProcessingTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.deleteProcessingTimeTimer((Object)"ciao", 10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isOne();
        processingTimeService.setCurrentTime(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)2))).onProcessingTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)0))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"hello")));
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isZero();
    }

    @TestTemplate
    void testForEachEventTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        HashSet<Tuple3> timers = new HashSet<Tuple3>();
        timers.add(Tuple3.of((Object)key1, (Object)"ciao", (Object)10L));
        timers.add(Tuple3.of((Object)key1, (Object)"hello", (Object)10L));
        timers.add(Tuple3.of((Object)key2, (Object)"ciao", (Object)10L));
        timers.add(Tuple3.of((Object)key2, (Object)"hello", (Object)10L));
        for (Tuple3 timer2 : timers) {
            keyContext.setCurrentKey(timer2.f0);
            timerService.registerEventTimeTimer((Object)((String)timer2.f1), ((Long)timer2.f2).longValue());
        }
        HashSet results = new HashSet();
        timerService.forEachEventTimeTimer((namespace, timer) -> results.add(Tuple3.of((Object)((Integer)keyContext.getCurrentKey()), (Object)namespace, (Object)timer)));
        Assertions.assertThat(results).isEqualTo(timers);
    }

    @TestTemplate
    void testForEachProcessingTimeTimers() throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        HashSet<Tuple3> timers = new HashSet<Tuple3>();
        timers.add(Tuple3.of((Object)key1, (Object)"ciao", (Object)10L));
        timers.add(Tuple3.of((Object)key1, (Object)"hello", (Object)10L));
        timers.add(Tuple3.of((Object)key2, (Object)"ciao", (Object)10L));
        timers.add(Tuple3.of((Object)key2, (Object)"hello", (Object)10L));
        for (Tuple3 timer2 : timers) {
            keyContext.setCurrentKey(timer2.f0);
            timerService.registerProcessingTimeTimer((Object)((String)timer2.f1), ((Long)timer2.f2).longValue());
        }
        HashSet results = new HashSet();
        timerService.forEachProcessingTimeTimer((namespace, timer) -> results.add(Tuple3.of((Object)((Integer)keyContext.getCurrentKey()), (Object)namespace, (Object)timer)));
        Assertions.assertThat(results).isEqualTo(timers);
    }

    @TestTemplate
    void testSnapshotAndRestore() throws Exception {
        this.testSnapshotAndRestore(2);
    }

    @TestTemplate
    void testSnapshotAndRebalancingRestore() throws Exception {
        this.testSnapshotAndRebalancingRestore(2);
    }

    private void testSnapshotAndRestore(int snapshotVersion) throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        while (key2 == key1) {
            key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(this.testKeyGroupRange, this.maxParallelism);
        }
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isOne();
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"ciao")).isOne();
        HashMap<Integer, byte[]> snapshot = new HashMap<Integer, byte[]>();
        for (Integer keyGroupIndex : this.testKeyGroupRange) {
            try (ByteArrayOutputStream outStream = new ByteArrayOutputStream();){
                InternalTimersSnapshot timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex.intValue());
                InternalTimersSnapshotReaderWriters.getWriterForVersion((int)snapshotVersion, (InternalTimersSnapshot)timersSnapshot, (TypeSerializer)timerService.getKeySerializer(), (TypeSerializer)timerService.getNamespaceSerializer()).writeTimersSnapshot((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)outStream));
                snapshot.put(keyGroupIndex, outStream.toByteArray());
            }
        }
        Triggerable mockTriggerable2 = (Triggerable)Mockito.mock(Triggerable.class);
        keyContext = new TestKeyContext();
        processingTimeService = new TestProcessingTimeService();
        timerService = InternalTimerServiceImplTest.restoreTimerService(snapshot, snapshotVersion, (Triggerable<Integer, String>)mockTriggerable2, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, this.createQueueFactory());
        processingTimeService.setCurrentTime(10L);
        timerService.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)2))).onProcessingTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)2))).onEventTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"ciao")));
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isZero();
    }

    private void testSnapshotAndRebalancingRestore(int snapshotVersion) throws Exception {
        Triggerable mockTriggerable = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext = new TestKeyContext();
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        PriorityQueueSetFactory queueFactory = this.createQueueFactory();
        InternalTimerServiceImpl<Integer, String> timerService = InternalTimerServiceImplTest.createAndStartInternalTimerService((Triggerable<Integer, String>)mockTriggerable, keyContext, (ProcessingTimeService)processingTimeService, this.testKeyGroupRange, queueFactory);
        int midpoint = this.testKeyGroupRange.getStartKeyGroup() + (this.testKeyGroupRange.getEndKeyGroup() - this.testKeyGroupRange.getStartKeyGroup()) / 2;
        KeyGroupRange subKeyGroupRange1 = new KeyGroupRange(this.testKeyGroupRange.getStartKeyGroup(), midpoint);
        KeyGroupRange subKeyGroupRange2 = new KeyGroupRange(midpoint + 1, this.testKeyGroupRange.getEndKeyGroup());
        int key1 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(subKeyGroupRange1, this.maxParallelism);
        int key2 = InternalTimerServiceImplTest.getKeyInKeyGroupRange(subKeyGroupRange2, this.maxParallelism);
        keyContext.setCurrentKey(key1);
        timerService.registerProcessingTimeTimer((Object)"ciao", 10L);
        timerService.registerEventTimeTimer((Object)"hello", 10L);
        keyContext.setCurrentKey(key2);
        timerService.registerEventTimeTimer((Object)"ciao", 10L);
        timerService.registerProcessingTimeTimer((Object)"hello", 10L);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)timerService.numProcessingTimeTimers((Object)"ciao")).isOne();
        Assertions.assertThat((int)timerService.numEventTimeTimers()).isEqualTo(2);
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"hello")).isOne();
        Assertions.assertThat((int)timerService.numEventTimeTimers((Object)"ciao")).isOne();
        HashMap<Integer, byte[]> snapshot1 = new HashMap<Integer, byte[]>();
        HashMap<Integer, byte[]> snapshot2 = new HashMap<Integer, byte[]>();
        for (Integer keyGroupIndex : this.testKeyGroupRange) {
            try (ByteArrayOutputStream outStream = new ByteArrayOutputStream();){
                InternalTimersSnapshot timersSnapshot = timerService.snapshotTimersForKeyGroup(keyGroupIndex.intValue());
                InternalTimersSnapshotReaderWriters.getWriterForVersion((int)snapshotVersion, (InternalTimersSnapshot)timersSnapshot, (TypeSerializer)timerService.getKeySerializer(), (TypeSerializer)timerService.getNamespaceSerializer()).writeTimersSnapshot((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)outStream));
                if (subKeyGroupRange1.contains(keyGroupIndex.intValue())) {
                    snapshot1.put(keyGroupIndex, outStream.toByteArray());
                    continue;
                }
                if (subKeyGroupRange2.contains(keyGroupIndex.intValue())) {
                    snapshot2.put(keyGroupIndex, outStream.toByteArray());
                    continue;
                }
                throw new IllegalStateException("Key-Group index doesn't belong to any sub range.");
            }
        }
        Triggerable mockTriggerable1 = (Triggerable)Mockito.mock(Triggerable.class);
        Triggerable mockTriggerable2 = (Triggerable)Mockito.mock(Triggerable.class);
        TestKeyContext keyContext1 = new TestKeyContext();
        TestKeyContext keyContext2 = new TestKeyContext();
        TestProcessingTimeService processingTimeService1 = new TestProcessingTimeService();
        TestProcessingTimeService processingTimeService2 = new TestProcessingTimeService();
        InternalTimerServiceImpl<Integer, String> timerService1 = InternalTimerServiceImplTest.restoreTimerService(snapshot1, snapshotVersion, (Triggerable<Integer, String>)mockTriggerable1, keyContext1, (ProcessingTimeService)processingTimeService1, subKeyGroupRange1, queueFactory);
        InternalTimerServiceImpl<Integer, String> timerService2 = InternalTimerServiceImplTest.restoreTimerService(snapshot2, snapshotVersion, (Triggerable<Integer, String>)mockTriggerable2, keyContext2, (ProcessingTimeService)processingTimeService2, subKeyGroupRange2, queueFactory);
        processingTimeService1.setCurrentTime(10L);
        timerService1.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onProcessingTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.never())).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onEventTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable1, (VerificationMode)Mockito.never())).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"ciao")));
        Assertions.assertThat((int)timerService1.numEventTimeTimers()).isZero();
        processingTimeService2.setCurrentTime(10L);
        timerService2.advanceWatermark(10L);
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.never())).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"ciao")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onProcessingTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime(InternalTimerServiceImplTest.anyInternalTimer());
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.never())).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key1, (Object)"hello")));
        ((Triggerable)Mockito.verify((Object)mockTriggerable2, (VerificationMode)Mockito.times((int)1))).onEventTime((InternalTimer)Mockito.eq((Object)new TimerHeapInternalTimer(10L, (Object)key2, (Object)"ciao")));
        Assertions.assertThat((int)timerService2.numEventTimeTimers()).isZero();
    }

    private static int getKeyInKeyGroup(int keyGroup, int maxParallelism) {
        Random rand = new Random(System.currentTimeMillis());
        int result = rand.nextInt();
        while (KeyGroupRangeAssignment.assignToKeyGroup((Object)result, (int)maxParallelism) != keyGroup) {
            result = rand.nextInt();
        }
        return result;
    }

    private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism) {
        Random rand = new Random(System.currentTimeMillis());
        int result = rand.nextInt();
        while (!range.contains(KeyGroupRangeAssignment.assignToKeyGroup((Object)result, (int)maxParallelism))) {
            result = rand.nextInt();
        }
        return result;
    }

    private static InternalTimerServiceImpl<Integer, String> createAndStartInternalTimerService(Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupRange keyGroupList, PriorityQueueSetFactory priorityQueueSetFactory) {
        InternalTimerServiceImpl service = InternalTimerServiceImplTest.createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), keyGroupList, keyContext, processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, triggerable);
        return service;
    }

    private static InternalTimerServiceImpl<Integer, String> restoreTimerService(Map<Integer, byte[]> state, int snapshotVersion, Triggerable<Integer, String> triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupRange keyGroupsList, PriorityQueueSetFactory priorityQueueSetFactory) throws Exception {
        InternalTimerServiceImpl service = InternalTimerServiceImplTest.createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), keyGroupsList, keyContext, processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, priorityQueueSetFactory);
        for (Integer keyGroupIndex : keyGroupsList) {
            if (!state.containsKey(keyGroupIndex)) continue;
            try (ByteArrayInputStream inputStream = new ByteArrayInputStream(state.get(keyGroupIndex));){
                InternalTimersSnapshot restoredTimersSnapshot = InternalTimersSnapshotReaderWriters.getReaderForVersion((int)snapshotVersion, (ClassLoader)InternalTimerServiceImplTest.class.getClassLoader()).readTimersSnapshot((DataInputView)new DataInputViewStreamWrapper((InputStream)inputStream));
                service.restoreTimersForKeyGroup(restoredTimersSnapshot, keyGroupIndex.intValue());
            }
        }
        service.startTimerService((TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE, triggerable);
        return service;
    }

    private PriorityQueueSetFactory createQueueFactory() {
        return this.createQueueFactory(this.testKeyGroupRange, this.maxParallelism);
    }

    protected PriorityQueueSetFactory createQueueFactory(KeyGroupRange keyGroupRange, int numKeyGroups) {
        return new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128);
    }

    @Parameters(name="start = {0}, end = {1}, max = {2}")
    private static Collection<Object[]> keyRanges() {
        return Arrays.asList({0, 32766, (short)Short.MAX_VALUE}, {0, 10, (short)Short.MAX_VALUE}, {0, 10, 10}, {10, 32766, (short)Short.MAX_VALUE}, {2, 5, 100}, {2, 5, 6});
    }

    private static <K, N> InternalTimerServiceImpl<K, N> createInternalTimerService(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupsList, KeyContext keyContext, ProcessingTimeService processingTimeService, TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, PriorityQueueSetFactory priorityQueueSetFactory) {
        TimerSerializer timerSerializer = new TimerSerializer(keySerializer, namespaceSerializer);
        return new InternalTimerServiceImpl(taskIOMetricGroup, keyGroupsList, keyContext, processingTimeService, InternalTimerServiceImplTest.createTimerQueue("__test_processing_timers", timerSerializer, priorityQueueSetFactory), InternalTimerServiceImplTest.createTimerQueue("__test_event_timers", timerSerializer, priorityQueueSetFactory), StreamTaskCancellationContext.alwaysRunning());
    }

    private static <K, N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerQueue(String name, TimerSerializer<K, N> timerSerializer, PriorityQueueSetFactory priorityQueueSetFactory) {
        return priorityQueueSetFactory.create(name, timerSerializer);
    }

    private static class TestKeyContext
    implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object key) {
            this.key = key;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }
}

