/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.KvStateServerHandlerTest;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

class KvStateServerTest {
    private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
    private static final int TIMEOUT_MILLIS = 10000;

    KvStateServerTest() {
    }

    @AfterAll
    static void tearDown() throws Exception {
        if (NIO_GROUP != null) {
            NIO_GROUP.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSimpleRequest() throws Throwable {
        KvStateServerImpl server = null;
        Bootstrap bootstrap = null;
        try {
            KvStateRegistry registry = new KvStateRegistry();
            AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
            server = new KvStateServerImpl(InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), Integer.valueOf(1), Integer.valueOf(1), registry, (KvStateRequestStats)stats);
            server.start();
            InetSocketAddress serverAddress = server.getServerAddress();
            int numKeyGroups = 1;
            HashMapStateBackend abstractBackend = new HashMapStateBackend();
            DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
            dummyEnv.setKvStateRegistry(registry);
            JobID jobId = new JobID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            TaskKvStateRegistry kvStateRegistry = registry.createTaskRegistry(jobId, new JobVertexID());
            CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
            AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)dummyEnv, jobId, "test_op", (TypeSerializer)IntSerializer.INSTANCE, numKeyGroups, keyGroupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), cancelStreamRegistry));
            KvStateServerHandlerTest.TestRegistryListener registryListener = new KvStateServerHandlerTest.TestRegistryListener();
            registry.registerListener(jobId, (KvStateRegistryListener)registryListener);
            ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
            desc.setQueryable("vanilla");
            ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
            int expectedValue = 712828289;
            int key = 99812822;
            backend.setCurrentKey((Object)key);
            state.update((Object)expectedValue);
            byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)key, (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
            final LinkedBlockingQueue responses = new LinkedBlockingQueue();
            bootstrap = this.createBootstrap(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), new ChannelInboundHandlerAdapter(){

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    responses.add((ByteBuf)msg);
                }
            }});
            Channel channel = bootstrap.connect(serverAddress.getAddress(), serverAddress.getPort()).sync().channel();
            long requestId = 2147666475L;
            Assertions.assertThat((String)registryListener.registrationName).isEqualTo("vanilla");
            KvStateInternalRequest request = new KvStateInternalRequest(registryListener.kvStateId, serializedKeyAndNamespace);
            ByteBuf serializeRequest = MessageSerializer.serializeRequest((ByteBufAllocator)channel.alloc(), (long)requestId, (MessageBody)request);
            channel.writeAndFlush((Object)serializeRequest);
            ByteBuf buf = (ByteBuf)responses.poll(10000L, TimeUnit.MILLISECONDS);
            Assertions.assertThat((Comparable)MessageSerializer.deserializeHeader((ByteBuf)buf)).isEqualTo((Object)MessageType.REQUEST_RESULT);
            Assertions.assertThat((long)MessageSerializer.getRequestId((ByteBuf)buf)).isEqualTo(requestId);
            KvStateResponse response = (KvStateResponse)server.getSerializer().deserializeResponse(buf);
            int actualValue = (Integer)KvStateSerializer.deserializeValue((byte[])response.getContent(), (TypeSerializer)IntSerializer.INSTANCE);
            Assertions.assertThat((int)actualValue).isEqualTo(expectedValue);
        }
        finally {
            EventLoopGroup group;
            if (server != null) {
                server.shutdown();
            }
            if (bootstrap != null && (group = bootstrap.config().group()) != null) {
                group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
            }
        }
    }

    private Bootstrap createBootstrap(final ChannelHandler ... handlers) {
        return (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)NIO_GROUP)).channel(NioSocketChannel.class)).handler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(handlers);
            }
        });
    }
}

