/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api.input;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.mock.MockRestoreOperation;
import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
import org.apache.flink.state.api.input.MultiStateKeyIterator;
import org.junit.Assert;
import org.junit.Test;

public class MultiStateKeyIteratorTest {
    private static final List<ValueStateDescriptor<Integer>> descriptors = new ArrayList<ValueStateDescriptor<Integer>>(2);

    private static AbstractKeyedStateBackend<Integer> createKeyedStateBackend() {
        MockStateBackend backend = new MockStateBackend();
        DummyEnvironment env = new DummyEnvironment();
        JobID jobID = new JobID();
        KeyGroupRange keyGroupRange = KeyGroupRange.of((int)0, (int)128);
        TaskMetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        return backend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)env, jobID, "mock-backend", (TypeSerializer)IntSerializer.INSTANCE, 129, keyGroupRange, (TaskKvStateRegistry)null, TtlTimeProvider.DEFAULT, (MetricGroup)metricGroup, Collections.emptyList(), cancelStreamRegistry));
    }

    private static CountingKeysKeyedStateBackend createCountingKeysKeyedStateBackend(Integer numKeys) {
        DummyEnvironment env = new DummyEnvironment();
        IntSerializer keySerializer = IntSerializer.INSTANCE;
        int numberOfKeyGroups = 129;
        KeyGroupRange keyGroupRange = KeyGroupRange.of((int)0, (int)128);
        TaskKvStateRegistry kvStateRegistry = null;
        TtlTimeProvider ttlTimeProvider = TtlTimeProvider.DEFAULT;
        List stateHandles = Collections.emptyList();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        HashMap stateValues = new HashMap();
        MockRestoreOperation restoreOperation = new MockRestoreOperation(stateHandles, stateValues);
        restoreOperation.restore();
        StateSerializerProvider keySerializerProvider = StateSerializerProvider.fromNewRegisteredSerializer((TypeSerializer)keySerializer);
        return new CountingKeysKeyedStateBackend(numKeys, kvStateRegistry, (TypeSerializer<Integer>)keySerializerProvider.currentSchemaSerializer(), env.getUserCodeClassLoader().asClassLoader(), env.getExecutionConfig(), ttlTimeProvider, LatencyTrackingStateConfig.disabled(), cancelStreamRegistry, (InternalKeyContext<Integer>)new InternalKeyContextImpl(keyGroupRange, numberOfKeyGroups));
    }

    private static void setKey(AbstractKeyedStateBackend<Integer> backend, ValueStateDescriptor<Integer> descriptor, int key) throws Exception {
        backend.setCurrentKey((Object)key);
        ((ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, descriptor)).update((Object)0);
    }

    private static void clearKey(AbstractKeyedStateBackend<Integer> backend, ValueStateDescriptor<Integer> descriptor, int key) throws Exception {
        backend.setCurrentKey((Object)key);
        ((ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, descriptor)).clear();
    }

    @Test
    public void testIteratorPullsKeyFromAllDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> keyedStateBackend = MultiStateKeyIteratorTest.createKeyedStateBackend();
        MultiStateKeyIteratorTest.setKey(keyedStateBackend, descriptors.get(0), 1);
        MultiStateKeyIteratorTest.setKey(keyedStateBackend, descriptors.get(1), 2);
        MultiStateKeyIterator iterator = new MultiStateKeyIterator(descriptors, keyedStateBackend);
        ArrayList<Integer> keys = new ArrayList<Integer>();
        while (iterator.hasNext()) {
            keys.add((Integer)iterator.next());
        }
        Assert.assertEquals((String)"Unexpected number of keys", (long)2L, (long)keys.size());
        Assert.assertEquals((String)"Unexpected keys found", Arrays.asList(1, 2), keys);
    }

    @Test
    public void testIteratorSkipsEmptyDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> keyedStateBackend = MultiStateKeyIteratorTest.createKeyedStateBackend();
        ArrayList<ValueStateDescriptor> threeDescriptors = new ArrayList<ValueStateDescriptor>(3);
        threeDescriptors.add(new ValueStateDescriptor("state-1", Types.INT));
        threeDescriptors.add(new ValueStateDescriptor("state-2", Types.INT));
        threeDescriptors.add(new ValueStateDescriptor("state-3", Types.INT));
        MultiStateKeyIteratorTest.setKey(keyedStateBackend, (ValueStateDescriptor<Integer>)((ValueStateDescriptor)threeDescriptors.get(0)), 1);
        MultiStateKeyIteratorTest.setKey(keyedStateBackend, (ValueStateDescriptor<Integer>)((ValueStateDescriptor)threeDescriptors.get(1)), 12);
        MultiStateKeyIteratorTest.clearKey(keyedStateBackend, (ValueStateDescriptor<Integer>)((ValueStateDescriptor)threeDescriptors.get(1)), 12);
        MultiStateKeyIteratorTest.setKey(keyedStateBackend, (ValueStateDescriptor<Integer>)((ValueStateDescriptor)threeDescriptors.get(2)), 2);
        MultiStateKeyIterator iterator = new MultiStateKeyIterator(threeDescriptors, keyedStateBackend);
        ArrayList<Integer> keys = new ArrayList<Integer>();
        while (iterator.hasNext()) {
            keys.add((Integer)iterator.next());
        }
        Assert.assertEquals((String)"Unexpected number of keys", (long)2L, (long)keys.size());
        Assert.assertEquals((String)"Unexpected keys found", Arrays.asList(1, 2), keys);
    }

    @Test
    public void testIteratorRemovesFromAllDescriptors() throws Exception {
        AbstractKeyedStateBackend<Integer> keyedStateBackend = MultiStateKeyIteratorTest.createKeyedStateBackend();
        MultiStateKeyIteratorTest.setKey(keyedStateBackend, descriptors.get(0), 1);
        MultiStateKeyIteratorTest.setKey(keyedStateBackend, descriptors.get(1), 1);
        MultiStateKeyIterator iterator = new MultiStateKeyIterator(descriptors, keyedStateBackend);
        int key = (Integer)iterator.next();
        Assert.assertEquals((String)"Unexpected keys pulled from state backend", (long)1L, (long)key);
        iterator.remove();
        Assert.assertFalse((String)"Failed to drop key from all descriptors in state backend", (boolean)iterator.hasNext());
        for (StateDescriptor stateDescriptor : descriptors) {
            Assert.assertEquals((String)"Failed to drop key for state descriptor", (long)0L, (long)keyedStateBackend.getKeys(stateDescriptor.getName(), (Object)VoidNamespace.INSTANCE).count());
        }
    }

    @Test
    public void testIteratorPullsSingleKeyFromAllDescriptors() throws AssertionError {
        CountingKeysKeyedStateBackend keyedStateBackend = MultiStateKeyIteratorTest.createCountingKeysKeyedStateBackend(100000000);
        MultiStateKeyIterator testedIterator = new MultiStateKeyIterator(descriptors, (KeyedStateBackend)keyedStateBackend);
        testedIterator.hasNext();
        Assert.assertEquals((String)"Unexpected number of keys enumerated", (long)1L, (long)keyedStateBackend.numberOfKeysEnumerated);
    }

    static {
        descriptors.add((ValueStateDescriptor<Integer>)new ValueStateDescriptor("state-1", Types.INT));
        descriptors.add((ValueStateDescriptor<Integer>)new ValueStateDescriptor("state-2", Types.INT));
    }

    static class CountingKeysKeyedStateBackend
    extends AbstractKeyedStateBackend<Integer> {
        int numberOfKeysGenerated;
        public long numberOfKeysEnumerated;

        public CountingKeysKeyedStateBackend(int numberOfKeysGenerated, TaskKvStateRegistry kvStateRegistry, TypeSerializer<Integer> keySerializer, ClassLoader userCodeClassLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry cancelStreamRegistry, InternalKeyContext<Integer> keyContext) {
            super(kvStateRegistry, keySerializer, userCodeClassLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, cancelStreamRegistry, keyContext);
            this.numberOfKeysGenerated = numberOfKeysGenerated;
            this.numberOfKeysEnumerated = 0L;
        }

        public <N> Stream<Integer> getKeys(String state, N namespace) {
            return IntStream.range(0, this.numberOfKeysGenerated).boxed().peek(i -> ++this.numberOfKeysEnumerated);
        }

        public int numKeyValueStateEntries() {
            return this.numberOfKeysGenerated;
        }

        @Nonnull
        public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        public void notifyCheckpointComplete(long checkpointId) {
        }

        @Nonnull
        public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws UnsupportedOperationException {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        @Nonnull
        public <N, SV, SEV, S extends State, IS extends S> IS createOrUpdateInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws UnsupportedOperationException {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        public <N> Stream<Tuple2<Integer, N>> getKeysAndNamespaces(String state) {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }

        @Nonnull
        public SavepointResources<Integer> savepoint() throws UnsupportedOperationException {
            throw new UnsupportedOperationException("Operations other than getKeys() are not supported on this testing StateBackend.");
        }
    }
}

