/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.SerializerFactory;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

class StreamingRuntimeContextTest {
    StreamingRuntimeContextTest() {
    }

    @Test
    void testValueStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        ((SerializerConfigImpl)config.getSerializerConfig()).registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, false);
        ValueStateDescriptor descr = new ValueStateDescriptor("name", TaskInfo.class);
        context.getState(descr);
        StateDescriptor descrIntercepted = (StateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testReducingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        ((SerializerConfigImpl)config.getSerializerConfig()).registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, false);
        ReduceFunction reducer = (ReduceFunction)Mockito.mock(ReduceFunction.class);
        ReducingStateDescriptor descr = new ReducingStateDescriptor("name", reducer, TaskInfo.class);
        context.getReducingState(descr);
        StateDescriptor descrIntercepted = (StateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testAggregatingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        ((SerializerConfigImpl)config.getSerializerConfig()).registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, false);
        AggregateFunction aggregate = (AggregateFunction)Mockito.mock(AggregateFunction.class);
        org.apache.flink.api.common.state.AggregatingStateDescriptor descr = new org.apache.flink.api.common.state.AggregatingStateDescriptor("name", aggregate, TaskInfo.class);
        context.getAggregatingState(descr);
        org.apache.flink.api.common.state.AggregatingStateDescriptor descrIntercepted = (org.apache.flink.api.common.state.AggregatingStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testListStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        ((SerializerConfigImpl)config.getSerializerConfig()).registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, false);
        org.apache.flink.api.common.state.ListStateDescriptor descr = new org.apache.flink.api.common.state.ListStateDescriptor("name", TaskInfo.class);
        context.getListState(descr);
        org.apache.flink.api.common.state.ListStateDescriptor descrIntercepted = (org.apache.flink.api.common.state.ListStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(ListSerializer.class);
        TypeSerializer elementSerializer = descrIntercepted.getElementSerializer();
        Assertions.assertThat((Object)elementSerializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)elementSerializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testListStateReturnsEmptyListByDefault() throws Exception {
        StreamingRuntimeContext context = this.createRuntimeContext();
        org.apache.flink.api.common.state.ListStateDescriptor descr = new org.apache.flink.api.common.state.ListStateDescriptor("name", String.class);
        ListState state = context.getListState(descr);
        Iterable value = (Iterable)state.get();
        Assertions.assertThat((Iterable)value).isNotNull();
        Assertions.assertThat(value.iterator()).isExhausted();
    }

    @Test
    void testMapStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        ((SerializerConfigImpl)config.getSerializerConfig()).registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, false);
        MapStateDescriptor descr = new MapStateDescriptor("name", String.class, TaskInfo.class);
        context.getMapState(descr);
        MapStateDescriptor descrIntercepted = (MapStateDescriptor)descriptorCapture.get();
        TypeSerializer valueSerializer = descrIntercepted.getValueSerializer();
        Assertions.assertThat((Object)valueSerializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)valueSerializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testMapStateReturnsEmptyMapByDefault() throws Exception {
        StreamingRuntimeContext context = this.createMapOperatorRuntimeContext();
        MapStateDescriptor descr = new MapStateDescriptor("name", Integer.class, String.class);
        MapState state = context.getMapState(descr);
        Iterable value = state.entries();
        Assertions.assertThat((Iterable)value).isNotNull();
        Assertions.assertThat(value.iterator()).isExhausted();
    }

    @Test
    void testV2ValueStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        SerializerConfigImpl serializerConfig = (SerializerConfigImpl)config.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, true);
        org.apache.flink.api.common.state.v2.ValueStateDescriptor descr = new org.apache.flink.api.common.state.v2.ValueStateDescriptor("name", TypeInformation.of(TaskInfo.class));
        context.getValueState(descr);
        org.apache.flink.api.common.state.v2.ValueStateDescriptor descrIntercepted = (org.apache.flink.api.common.state.v2.ValueStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2ListStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        SerializerConfigImpl serializerConfig = (SerializerConfigImpl)config.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, true);
        ListStateDescriptor descr = new ListStateDescriptor("name", TypeInformation.of(TaskInfo.class));
        context.getListState(descr);
        ListStateDescriptor descrIntercepted = (ListStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2MapStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        SerializerConfigImpl serializerConfig = (SerializerConfigImpl)config.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, true);
        org.apache.flink.api.common.state.v2.MapStateDescriptor descr = new org.apache.flink.api.common.state.v2.MapStateDescriptor("name", TypeInformation.of(String.class), TypeInformation.of(TaskInfo.class));
        context.getMapState(descr);
        org.apache.flink.api.common.state.v2.MapStateDescriptor descrIntercepted = (org.apache.flink.api.common.state.v2.MapStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2ReducingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        SerializerConfigImpl serializerConfig = (SerializerConfigImpl)config.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, true);
        ReduceFunction reducer = (ReduceFunction)Mockito.mock(ReduceFunction.class);
        org.apache.flink.api.common.state.v2.ReducingStateDescriptor descr = new org.apache.flink.api.common.state.v2.ReducingStateDescriptor("name", reducer, TypeInformation.of(TaskInfo.class));
        context.getReducingState(descr);
        org.apache.flink.api.common.state.v2.ReducingStateDescriptor descrIntercepted = (org.apache.flink.api.common.state.v2.ReducingStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2AggregatingStateInstantiation() throws Exception {
        ExecutionConfig config = new ExecutionConfig();
        SerializerConfigImpl serializerConfig = (SerializerConfigImpl)config.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> descriptorCapture = new AtomicReference<Object>();
        StreamingRuntimeContext context = this.createRuntimeContext(descriptorCapture, config, true);
        AggregateFunction aggregate = (AggregateFunction)Mockito.mock(AggregateFunction.class);
        AggregatingStateDescriptor descr = new AggregatingStateDescriptor("name", aggregate, TypeInformation.of(TaskInfo.class));
        context.getAggregatingState(descr);
        AggregatingStateDescriptor descrIntercepted = (AggregatingStateDescriptor)descriptorCapture.get();
        TypeSerializer serializer = descrIntercepted.getSerializer();
        Assertions.assertThat((Object)serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat((int)((KryoSerializer)serializer).getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    private StreamingRuntimeContext createMapOperatorRuntimeContext() throws Exception {
        AbstractStreamOperator<?> mapPlainMockOp = StreamingRuntimeContextTest.createMapPlainMockOp();
        return this.createRuntimeContext(mapPlainMockOp);
    }

    private StreamingRuntimeContext createRuntimeContext() throws Exception {
        return new StreamingRuntimeContext(StreamingRuntimeContextTest.createListPlainMockOp(), (Environment)MockEnvironment.builder().build(), Collections.emptyMap());
    }

    private StreamingRuntimeContext createRuntimeContext(AtomicReference<Object> descriptorCapture, ExecutionConfig config, boolean stateV2) throws Exception {
        return StreamingRuntimeContextTest.createDescriptorCapturingMockOp(descriptorCapture, config, MockEnvironment.builder().setExecutionConfig(config).build(), stateV2).getRuntimeContext();
    }

    private StreamingRuntimeContext createRuntimeContext(AbstractStreamOperator<?> operator) {
        return new StreamingRuntimeContext((Environment)MockEnvironment.builder().build(), Collections.emptyMap(), operator.getMetricGroup(), operator.getOperatorID(), operator.getProcessingTimeService(), operator.getKeyedStateStore(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES);
    }

    private static AbstractStreamOperator<?> createDescriptorCapturingMockOp(AtomicReference<Object> ref, final ExecutionConfig config, Environment environment, boolean stateV2) throws Exception {
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setOperatorID(new OperatorID());
        AbstractStreamOperator<Object> operator = new AbstractStreamOperator<Object>(new StreamOperatorParameters((StreamTask)new MockStreamTaskBuilder(environment).setExecutionConfig(config).build(), streamConfig, new CollectorOutput(new ArrayList<StreamElement>()), TestProcessingTimeService::new, null, null)){

            protected void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Object>> output) {
                super.setup(containingTask, config, output);
            }
        };
        StreamTaskStateInitializerImpl streamTaskStateManager = new StreamTaskStateInitializerImpl(environment, (StateBackend)new HashMapStateBackend());
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend)Mockito.mock(KeyedStateBackend.class);
        AsyncKeyedStateBackend asyncKeyedStateBackend = (AsyncKeyedStateBackend)Mockito.mock(AsyncKeyedStateBackend.class);
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, asyncKeyedStateBackend, new SerializerFactory(){

            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                return typeInformation.createSerializer(config.getSerializerConfig());
            }
        });
        ((KeyedStateBackend)Mockito.doAnswer(invocationOnMock -> {
            ref.set(invocationOnMock.getArguments()[2]);
            return null;
        }).when((Object)keyedStateBackend)).getPartitionedState(ArgumentMatchers.any(), (TypeSerializer)Mockito.any(TypeSerializer.class), (StateDescriptor)Mockito.any(StateDescriptor.class));
        ((AsyncKeyedStateBackend)Mockito.doAnswer(invocationOnMock -> {
            ref.set(invocationOnMock.getArguments()[2]);
            return null;
        }).when((Object)asyncKeyedStateBackend)).getOrCreateKeyedState(Mockito.any(), (TypeSerializer)Mockito.any(TypeSerializer.class), (org.apache.flink.api.common.state.v2.StateDescriptor)Mockito.any(org.apache.flink.api.common.state.v2.StateDescriptor.class));
        operator.initializeState((StreamTaskStateInitializer)streamTaskStateManager);
        if (stateV2) {
            keyedStateStore.setSupportKeyedStateApiSetV2();
        }
        operator.getRuntimeContext().setKeyedStateStore((KeyedStateStore)keyedStateStore);
        return operator;
    }

    private static AbstractStreamOperator<?> createListPlainMockOp() throws Exception {
        AbstractStreamOperator operatorMock = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        final ExecutionConfig config = new ExecutionConfig();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend)Mockito.mock(KeyedStateBackend.class);
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, new SerializerFactory(){

            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                return typeInformation.createSerializer(config.getSerializerConfig());
            }
        });
        Mockito.when((Object)operatorMock.getExecutionConfig()).thenReturn((Object)config);
        ((KeyedStateBackend)Mockito.doAnswer((Answer)new Answer<ListState<String>>(){

            public ListState<String> answer(InvocationOnMock invocationOnMock) throws Throwable {
                org.apache.flink.api.common.state.ListStateDescriptor descr = (org.apache.flink.api.common.state.ListStateDescriptor)invocationOnMock.getArguments()[2];
                HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
                DummyEnvironment env = new DummyEnvironment("test_task", 1, 0);
                JobID jobID = new JobID();
                KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
                TaskKvStateRegistry kvStateRegistry = new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID());
                CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
                AbstractKeyedStateBackend backend = abstractStateBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)env, jobID, "test_op", (TypeSerializer)IntSerializer.INSTANCE, 1, keyGroupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), cancelStreamRegistry));
                backend.setCurrentKey((Object)0);
                return (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)descr);
            }
        }).when((Object)keyedStateBackend)).getPartitionedState(ArgumentMatchers.any(), (TypeSerializer)Mockito.any(TypeSerializer.class), (StateDescriptor)Mockito.any(org.apache.flink.api.common.state.ListStateDescriptor.class));
        Mockito.when((Object)operatorMock.getKeyedStateStore()).thenReturn((Object)keyedStateStore);
        Mockito.when((Object)operatorMock.getOperatorID()).thenReturn((Object)new OperatorID());
        return operatorMock;
    }

    private static AbstractStreamOperator<?> createMapPlainMockOp() throws Exception {
        AbstractStreamOperator operatorMock = (AbstractStreamOperator)Mockito.mock(AbstractStreamOperator.class);
        final ExecutionConfig config = new ExecutionConfig();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend)Mockito.mock(KeyedStateBackend.class);
        DefaultKeyedStateStore keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, new SerializerFactory(){

            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                return typeInformation.createSerializer(config.getSerializerConfig());
            }
        });
        Mockito.when((Object)operatorMock.getExecutionConfig()).thenReturn((Object)config);
        ((KeyedStateBackend)Mockito.doAnswer((Answer)new Answer<MapState<Integer, String>>(){

            public MapState<Integer, String> answer(InvocationOnMock invocationOnMock) throws Throwable {
                MapStateDescriptor descr = (MapStateDescriptor)invocationOnMock.getArguments()[2];
                HashMapStateBackend abstractStateBackend = new HashMapStateBackend();
                DummyEnvironment env = new DummyEnvironment("test_task", 1, 0);
                JobID jobID = new JobID();
                KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
                TaskKvStateRegistry kvStateRegistry = new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID());
                CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
                AbstractKeyedStateBackend backend = abstractStateBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)env, jobID, "test_op", (TypeSerializer)IntSerializer.INSTANCE, 1, keyGroupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), cancelStreamRegistry));
                backend.setCurrentKey((Object)0);
                return (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)descr);
            }
        }).when((Object)keyedStateBackend)).getPartitionedState(ArgumentMatchers.any(), (TypeSerializer)Mockito.any(TypeSerializer.class), (StateDescriptor)Mockito.any(MapStateDescriptor.class));
        Mockito.when((Object)operatorMock.getKeyedStateStore()).thenReturn((Object)keyedStateStore);
        Mockito.when((Object)operatorMock.getOperatorID()).thenReturn((Object)new OperatorID());
        Mockito.when((Object)operatorMock.getProcessingTimeService()).thenReturn((Object)new TestProcessingTimeService());
        return operatorMock;
    }
}

