/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class KvStateRequestSerializerTest {
    KvStateRequestSerializerTest() {
    }

    public static Collection<Boolean> data() {
        return Arrays.asList(false, true);
    }

    @Test
    void testKeyAndNamespaceSerialization() throws Exception {
        LongSerializer keySerializer = LongSerializer.INSTANCE;
        StringSerializer namespaceSerializer = StringSerializer.INSTANCE;
        long expectedKey = 2147495970L;
        String expectedNamespace = "knilf";
        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)expectedKey, (TypeSerializer)keySerializer, (Object)expectedNamespace, (TypeSerializer)namespaceSerializer);
        Tuple2 actual = KvStateSerializer.deserializeKeyAndNamespace((byte[])serializedKeyAndNamespace, (TypeSerializer)keySerializer, (TypeSerializer)namespaceSerializer);
        Assertions.assertThat((long)((Long)actual.f0)).isEqualTo(expectedKey);
        Assertions.assertThat((String)((String)actual.f1)).isEqualTo(expectedNamespace);
    }

    @Test
    void testKeyAndNamespaceDeserializationEmpty() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeKeyAndNamespace((byte[])new byte[0], (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testKeyAndNamespaceDeserializationTooShort() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeKeyAndNamespace((byte[])new byte[]{1}, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testKeyAndNamespaceDeserializationTooMany1() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeKeyAndNamespace((byte[])new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2}, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testKeyAndNamespaceDeserializationTooMany2() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeKeyAndNamespace((byte[])new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 42, 0, 2, 2}, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testValueSerialization() throws Exception {
        LongSerializer valueSerializer = LongSerializer.INSTANCE;
        long expectedValue = 9223372035561846515L;
        byte[] serializedValue = KvStateSerializer.serializeValue((Object)expectedValue, (TypeSerializer)valueSerializer);
        long actualValue = (Long)KvStateSerializer.deserializeValue((byte[])serializedValue, (TypeSerializer)valueSerializer);
        Assertions.assertThat((long)actualValue).isEqualTo(expectedValue);
    }

    @Test
    void testDeserializeValueEmpty() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeValue((byte[])new byte[0], (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testDeserializeValueTooShort() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeValue((byte[])new byte[]{1}, (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testDeserializeValueTooMany1() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeValue((byte[])new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 2}, (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testDeserializeValueTooMany2() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeValue((byte[])new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 2, 2}, (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    void testListSerialization(boolean async) throws Exception {
        long key = 0L;
        HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = this.getLongHeapKeyedStateBackend(0L, async);
        InternalListState listState = (InternalListState)longHeapKeyedStateBackend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor("test", (TypeSerializer)LongSerializer.INSTANCE));
        KvStateRequestSerializerTest.testListSerialization(0L, (InternalListState<Long, VoidNamespace, Long>)listState);
    }

    public static void testListSerialization(long key, InternalListState<Long, VoidNamespace, Long> listState) throws Exception {
        LongSerializer valueSerializer = LongSerializer.INSTANCE;
        listState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        int numElements = 10;
        ArrayList<Long> expectedValues = new ArrayList<Long>();
        for (int i = 0; i < 10; ++i) {
            long value = ThreadLocalRandom.current().nextLong();
            expectedValues.add(value);
            listState.add((Object)value);
        }
        byte[] serializedKey = KvStateSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)LongSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        byte[] serializedValues = listState.getSerializedValue(serializedKey, listState.getKeySerializer(), listState.getNamespaceSerializer(), listState.getValueSerializer());
        List actualValues = KvStateSerializer.deserializeList((byte[])serializedValues, (TypeSerializer)valueSerializer);
        Assertions.assertThat((List)actualValues).isEqualTo(expectedValues);
        long expectedValue = ThreadLocalRandom.current().nextLong();
        byte[] serializedValue = KvStateSerializer.serializeValue((Object)expectedValue, (TypeSerializer)valueSerializer);
        List actualValue = KvStateSerializer.deserializeList((byte[])serializedValue, (TypeSerializer)valueSerializer);
        Assertions.assertThat((List)actualValue).containsExactly((Object[])new Long[]{expectedValue});
    }

    @Test
    void testDeserializeListEmpty() throws Exception {
        List actualValue = KvStateSerializer.deserializeList((byte[])new byte[0], (TypeSerializer)LongSerializer.INSTANCE);
        Assertions.assertThat((List)actualValue).isEmpty();
    }

    @Test
    void testDeserializeListTooShort1() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeList((byte[])new byte[]{1}, (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testDeserializeListTooShort2() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeList((byte[])new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 2, 3}, (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @ParameterizedTest
    @MethodSource(value={"data"})
    public void testMapSerialization(boolean async) throws Exception {
        long key = 0L;
        HeapKeyedStateBackend<Long> longHeapKeyedStateBackend = this.getLongHeapKeyedStateBackend(0L, async);
        InternalMapState mapState = (InternalMapState)longHeapKeyedStateBackend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor("test", (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE));
        KvStateRequestSerializerTest.testMapSerialization(0L, (InternalMapState<Long, VoidNamespace, Long, String>)mapState);
    }

    private HeapKeyedStateBackend<Long> getLongHeapKeyedStateBackend(long key, boolean async) throws BackendBuildingException {
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
        ExecutionConfig executionConfig = new ExecutionConfig();
        TaskKvStateRegistry taskKvStateRegistry = new KvStateRegistry().createTaskRegistry(JobID.generate(), new JobVertexID());
        HeapKeyedStateBackend longHeapKeyedStateBackend = new HeapKeyedStateBackendBuilder(taskKvStateRegistry, (TypeSerializer)LongSerializer.INSTANCE, ClassLoader.getSystemClassLoader(), keyGroupRange.getNumberOfKeyGroups(), keyGroupRange, executionConfig, TtlTimeProvider.DEFAULT, LatencyTrackingStateConfig.disabled(), Collections.emptyList(), AbstractStateBackend.getCompressionDecorator((ExecutionConfig)executionConfig), TestLocalRecoveryConfig.disabled(), new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128), async, new CloseableRegistry()).build();
        longHeapKeyedStateBackend.setCurrentKey((Object)key);
        return longHeapKeyedStateBackend;
    }

    public static void testMapSerialization(long key, InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {
        LongSerializer userKeySerializer = LongSerializer.INSTANCE;
        StringSerializer userValueSerializer = StringSerializer.INSTANCE;
        mapState.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        int numElements = 10;
        HashMap<Long, String> expectedValues = new HashMap<Long, String>();
        for (int i = 1; i <= 10; ++i) {
            long value = ThreadLocalRandom.current().nextLong();
            expectedValues.put(value, Long.toString(value));
            mapState.put((Object)value, (Object)Long.toString(value));
        }
        expectedValues.put(0L, null);
        mapState.put((Object)0L, null);
        byte[] serializedKey = KvStateSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)LongSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
        byte[] serializedValues = mapState.getSerializedValue(serializedKey, mapState.getKeySerializer(), mapState.getNamespaceSerializer(), mapState.getValueSerializer());
        Map actualValues = KvStateSerializer.deserializeMap((byte[])serializedValues, (TypeSerializer)userKeySerializer, (TypeSerializer)userValueSerializer);
        Assertions.assertThat((Map)actualValues).hasSize(expectedValues.size());
        for (Map.Entry actualEntry : actualValues.entrySet()) {
            Assertions.assertThat((String)((String)actualEntry.getValue())).isEqualTo((String)expectedValues.get(actualEntry.getKey()));
        }
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        long expectedKey = ThreadLocalRandom.current().nextLong();
        String expectedValue = Long.toString(expectedKey);
        byte[] isNull = new byte[]{0};
        baos.write(KvStateSerializer.serializeValue((Object)expectedKey, (TypeSerializer)userKeySerializer));
        baos.write(isNull);
        baos.write(KvStateSerializer.serializeValue((Object)expectedValue, (TypeSerializer)userValueSerializer));
        byte[] serializedValue = baos.toByteArray();
        Map actualValue = KvStateSerializer.deserializeMap((byte[])serializedValue, (TypeSerializer)userKeySerializer, (TypeSerializer)userValueSerializer);
        Assertions.assertThat((Map)actualValue).hasSize(1);
        Assertions.assertThat((String)((String)actualValue.get(expectedKey))).isEqualTo(expectedValue);
    }

    @Test
    void testDeserializeMapEmpty() throws Exception {
        Map actualValue = KvStateSerializer.deserializeMap((byte[])new byte[0], (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE);
        Assertions.assertThat((Map)actualValue).hasSize(0);
    }

    @Test
    void testDeserializeMapTooShort1() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeMap((byte[])new byte[]{1}, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testDeserializeMapTooShort2() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeMap((byte[])new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 0}, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    @Test
    void testDeserializeMapTooShort3() throws Exception {
        Assertions.assertThatThrownBy(() -> KvStateSerializer.deserializeMap((byte[])new byte[]{1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 3}, (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)LongSerializer.INSTANCE)).isInstanceOf(IOException.class);
    }

    private byte[] randomByteArray(int capacity) {
        byte[] bytes = new byte[capacity];
        ThreadLocalRandom.current().nextBytes(bytes);
        return bytes;
    }
}

