/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.messages.MessageType;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.queryablestate.server.KvStateServerImpl;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ClientTest {
    private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class);
    private NioEventLoopGroup nioGroup;

    ClientTest() {
    }

    @BeforeEach
    void setUp() {
        this.nioGroup = new NioEventLoopGroup();
    }

    @AfterEach
    void tearDown() {
        if (this.nioGroup != null) {
            this.nioGroup.shutdownGracefully();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSimpleRequests() throws Exception {
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        Channel serverChannel = null;
        try {
            long i;
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            byte[] expected = new byte[1024];
            ThreadLocalRandom.current().nextBytes(expected);
            LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<ByteBuf>();
            AtomicReference<Channel> channel = new AtomicReference<Channel>();
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelDataCollectingHandler(channel, received)});
            InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            long numQueries = 1024L;
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            for (long i2 = 0L; i2 < numQueries; ++i2) {
                KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
                futures.add(client.sendRequest(serverAddress, (MessageBody)request));
            }
            RuntimeException testException = new RuntimeException("Expected test Exception");
            for (i = 0L; i < numQueries; ++i) {
                ByteBuf response;
                ByteBuf buf = received.take();
                ((AbstractComparableAssert)Assertions.assertThat((Comparable)buf).withFailMessage("Receive timed out", new Object[0])).isNotNull();
                Channel ch = channel.get();
                ((AbstractComparableAssert)Assertions.assertThat((Comparable)ch).withFailMessage("Channel not active", new Object[0])).isNotNull();
                Assertions.assertThat((Comparable)MessageType.REQUEST).isEqualTo((Object)MessageSerializer.deserializeHeader((ByteBuf)buf));
                long requestId = MessageSerializer.getRequestId((ByteBuf)buf);
                buf.release();
                if (i % 2L == 0L) {
                    response = MessageSerializer.serializeResponse((ByteBufAllocator)serverChannel.alloc(), (long)requestId, (MessageBody)new KvStateResponse(expected));
                    ch.writeAndFlush((Object)response);
                    continue;
                }
                response = MessageSerializer.serializeRequestFailure((ByteBufAllocator)serverChannel.alloc(), (long)requestId, (Throwable)testException);
                ch.writeAndFlush((Object)response);
            }
            for (i = 0L; i < numQueries; ++i) {
                if (i % 2L == 0L) {
                    KvStateResponse serializedResult = (KvStateResponse)((CompletableFuture)futures.get((int)i)).get();
                    Assertions.assertThat((byte[])expected).containsExactly(serializedResult.getContent());
                    continue;
                }
                CompletableFuture future = (CompletableFuture)futures.get((int)i);
                FlinkAssertions.assertThatFuture((CompletableFuture)future).eventuallyFailsWith(ExecutionException.class).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RuntimeException.class)});
            }
            Assertions.assertThat((long)numQueries).isEqualTo(stats.getNumRequests());
            long expectedRequests = numQueries / 2L;
            while (stats.getNumSuccessful() != expectedRequests || stats.getNumFailed() != expectedRequests) {
                Thread.sleep(100L);
            }
            Assertions.assertThat((long)expectedRequests).isEqualTo(stats.getNumSuccessful());
            Assertions.assertThat((long)expectedRequests).isEqualTo(stats.getNumFailed());
        }
        finally {
            if (client != null) {
                Exception exc = null;
                try {
                    client.shutdown().get();
                }
                catch (Exception e) {
                    exc = e;
                    LOG.error("An exception occurred while shutting down netty.", (Throwable)e);
                }
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)client.isEventGroupShutdown()).withFailMessage(ExceptionUtils.stringifyException((Throwable)exc), new Object[0])).isTrue();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            ((AbstractLongAssert)Assertions.assertThat((long)stats.getNumConnections()).withFailMessage("Channel leak", new Object[0])).isZero();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testRequestUnavailableHost() {
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        try {
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            InetSocketAddress serverAddress = new InetSocketAddress("flink-qs-client-test-unavailable-host", 12345);
            KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
            CompletableFuture future = client.sendRequest(serverAddress, (MessageBody)request);
            Assertions.assertThat((CompletableFuture)future).isNotNull();
            Assertions.assertThatThrownBy(future::get).hasRootCauseInstanceOf(ConnectException.class);
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assertions.assertThat((boolean)client.isEventGroupShutdown()).isTrue();
            }
            ((AbstractLongAssert)Assertions.assertThat((long)stats.getNumConnections()).withFailMessage("Channel leak", new Object[0])).isZero();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentQueries() throws Exception {
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        ExecutorService executor = null;
        Client client = null;
        Channel serverChannel = null;
        byte[] serializedResult = new byte[1024];
        ThreadLocalRandom.current().nextBytes(serializedResult);
        try {
            int numQueryTasks = 4;
            int numQueriesPerTask = 1024;
            executor = Executors.newFixedThreadPool(numQueryTasks);
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            serverChannel = this.createServerChannel(new ChannelHandler[]{new RespondingChannelHandler((MessageSerializer<KvStateInternalRequest, KvStateResponse>)serializer, serializedResult)});
            InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            Client finalClient = client;
            Callable<List> queryTask = () -> {
                ArrayList<CompletableFuture> results = new ArrayList<CompletableFuture>(1024);
                for (int i = 0; i < 1024; ++i) {
                    KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
                    results.add(finalClient.sendRequest(serverAddress, (MessageBody)request));
                }
                return results;
            };
            ArrayList<Future<List>> futures = new ArrayList<Future<List>>();
            for (int i = 0; i < numQueryTasks; ++i) {
                futures.add(executor.submit(queryTask));
            }
            for (Future future : futures) {
                List results = (List)future.get();
                for (CompletableFuture result : results) {
                    KvStateResponse actual = (KvStateResponse)result.get();
                    Assertions.assertThat((byte[])serializedResult).containsExactly(actual.getContent());
                }
            }
            int totalQueries = numQueryTasks * 1024;
            while (stats.getNumSuccessful() != (long)totalQueries) {
                Thread.sleep(100L);
            }
            Assertions.assertThat((int)totalQueries).isEqualTo(stats.getNumRequests());
            Assertions.assertThat((int)totalQueries).isEqualTo(stats.getNumSuccessful());
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            if (client != null) {
                try {
                    client.shutdown().get();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assertions.assertThat((boolean)client.isEventGroupShutdown()).isTrue();
            }
            ((AbstractLongAssert)Assertions.assertThat((long)stats.getNumConnections()).withFailMessage("Channel leak", new Object[0])).isZero();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testFailureClosesChannel() throws Exception {
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        Channel serverChannel = null;
        try {
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<ByteBuf>();
            AtomicReference<Channel> channel = new AtomicReference<Channel>();
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelDataCollectingHandler(channel, received)});
            InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
            futures.add(client.sendRequest(serverAddress, (MessageBody)request));
            futures.add(client.sendRequest(serverAddress, (MessageBody)request));
            ByteBuf buf = received.take();
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)buf).withFailMessage("Receive timed out", new Object[0])).isNotNull();
            buf.release();
            buf = received.take();
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)buf).withFailMessage("Receive timed out", new Object[0])).isNotNull();
            buf.release();
            Assertions.assertThat((long)stats.getNumConnections()).isEqualTo(1L);
            Channel ch = channel.get();
            ((AbstractComparableAssert)Assertions.assertThat((Comparable)ch).withFailMessage("Channel not active", new Object[0])).isNotNull();
            ch.writeAndFlush((Object)MessageSerializer.serializeServerFailure((ByteBufAllocator)serverChannel.alloc(), (Throwable)new RuntimeException("Expected test server failure")));
            CompletableFuture removedFuture = (CompletableFuture)futures.remove(0);
            FlinkAssertions.assertThatFuture((CompletableFuture)removedFuture).eventuallyFailsWith(ExecutionException.class).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RuntimeException.class)});
            removedFuture = (CompletableFuture)futures.remove(0);
            FlinkAssertions.assertThatFuture((CompletableFuture)removedFuture).eventuallyFailsWith(ExecutionException.class).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(RuntimeException.class)});
            Assertions.assertThat((long)stats.getNumConnections()).isZero();
            while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L) {
                Thread.sleep(100L);
            }
            Assertions.assertThat((long)stats.getNumRequests()).isEqualTo(2L);
            Assertions.assertThat((long)stats.getNumSuccessful()).isZero();
            Assertions.assertThat((long)stats.getNumFailed()).isEqualTo(2L);
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assertions.assertThat((boolean)client.isEventGroupShutdown()).isTrue();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            ((AbstractLongAssert)Assertions.assertThat((long)stats.getNumConnections()).withFailMessage("Channel leak", new Object[0])).isZero();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testServerClosesChannel() throws Exception {
        AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        Channel serverChannel = null;
        try {
            client = new Client("Test Client", 1, serializer, (KvStateRequestStats)stats);
            LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<ByteBuf>();
            AtomicReference<Channel> channel = new AtomicReference<Channel>();
            serverChannel = this.createServerChannel(new ChannelHandler[]{new ChannelDataCollectingHandler(channel, received)});
            InetSocketAddress serverAddress = this.getKvStateServerAddress(serverChannel);
            KvStateInternalRequest request = new KvStateInternalRequest(new KvStateID(), new byte[0]);
            CompletableFuture future = client.sendRequest(serverAddress, (MessageBody)request);
            received.take();
            Assertions.assertThat((long)stats.getNumConnections()).isEqualTo(1L);
            channel.get().close().await();
            FlinkAssertions.assertThatFuture((CompletableFuture)future).eventuallyFailsWith(ExecutionException.class).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(ClosedChannelException.class)});
            Assertions.assertThat((long)stats.getNumConnections()).isZero();
            while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L) {
                Thread.sleep(100L);
            }
            Assertions.assertThat((long)stats.getNumRequests()).isEqualTo(1L);
            Assertions.assertThat((long)stats.getNumSuccessful()).isZero();
            Assertions.assertThat((long)stats.getNumFailed()).isEqualTo(1L);
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assertions.assertThat((boolean)client.isEventGroupShutdown()).isTrue();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
            ((AbstractLongAssert)Assertions.assertThat((long)stats.getNumConnections()).withFailMessage("Channel leak", new Object[0])).isZero();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testClientServerIntegration() throws Throwable {
        int numServers = 2;
        int numServerEventLoopThreads = 2;
        int numServerQueryThreads = 2;
        int numClientEventLoopThreads = 4;
        int numClientsTasks = 8;
        int batchSize = 16;
        boolean numKeyGroups = true;
        HashMapStateBackend abstractBackend = new HashMapStateBackend();
        KvStateRegistry dummyRegistry = new KvStateRegistry();
        DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
        dummyEnv.setKvStateRegistry(dummyRegistry);
        JobID jobID = new JobID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
        TaskKvStateRegistry kvStateRegistry = dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID());
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        AbstractKeyedStateBackend backend = abstractBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)dummyEnv, jobID, "test_op", (TypeSerializer)IntSerializer.INSTANCE, 1, keyGroupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), cancelStreamRegistry));
        AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        Client client = null;
        ExecutorService clientTaskExecutor = null;
        KvStateServerImpl[] server = new KvStateServerImpl[2];
        try {
            long numRequests;
            client = new Client("Test Client", 4, serializer, (KvStateRequestStats)clientStats);
            clientTaskExecutor = Executors.newFixedThreadPool(8);
            ValueStateDescriptor desc = new ValueStateDescriptor("any", (TypeSerializer)IntSerializer.INSTANCE);
            desc.setQueryable("any");
            KvStateRegistry[] registry = new KvStateRegistry[2];
            AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[2];
            KvStateID[] ids = new KvStateID[2];
            for (int i = 0; i < 2; ++i) {
                registry[i] = new KvStateRegistry();
                serverStats[i] = new AtomicKvStateRequestStats();
                server[i] = new KvStateServerImpl(InetAddress.getLocalHost().getHostName(), Collections.singletonList(0).iterator(), Integer.valueOf(2), Integer.valueOf(2), registry[i], (KvStateRequestStats)serverStats[i]);
                server[i].start();
                backend.setCurrentKey((Object)(1010 + i));
                ValueState state = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)desc);
                state.update((Object)(201 + i));
                InternalKvState kvState = (InternalKvState)state;
                ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState, this.getClass().getClassLoader());
            }
            Client finalClient = client;
            Callable<Void> queryTask = () -> {
                block0: while (true) {
                    int targetServer;
                    int j;
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }
                    ArrayList<Integer> random = new ArrayList<Integer>();
                    for (int j2 = 0; j2 < 16; ++j2) {
                        random.add(j2);
                    }
                    Collections.shuffle(random);
                    ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>(16);
                    for (j = 0; j < 16; ++j) {
                        targetServer = (Integer)random.get(j) % 2;
                        byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace((Object)(1010 + targetServer), (TypeSerializer)IntSerializer.INSTANCE, (Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE);
                        KvStateInternalRequest request = new KvStateInternalRequest(ids[targetServer], serializedKeyAndNamespace);
                        futures.add(finalClient.sendRequest(server[targetServer].getServerAddress(), (MessageBody)request));
                    }
                    j = 0;
                    while (true) {
                        if (j >= 16) continue block0;
                        targetServer = (Integer)random.get(j) % 2;
                        Future future = (Future)futures.get(j);
                        byte[] buf = ((KvStateResponse)future.get()).getContent();
                        int value = (Integer)KvStateSerializer.deserializeValue((byte[])buf, (TypeSerializer)IntSerializer.INSTANCE);
                        Assertions.assertThat((int)value).isEqualTo(201L + (long)targetServer);
                        ++j;
                    }
                    break;
                }
            };
            ArrayList<Future<Void>> taskFutures = new ArrayList<Future<Void>>();
            for (int i = 0; i < 8; ++i) {
                taskFutures.add(clientTaskExecutor.submit(queryTask));
            }
            while ((numRequests = clientStats.getNumRequests()) < 100000L) {
                Thread.sleep(100L);
                LOG.info("Number of requests {}/100_000", (Object)numRequests);
            }
            try {
                client.shutdown().get();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            Assertions.assertThat((boolean)client.isEventGroupShutdown()).isTrue();
            for (Future future : taskFutures) {
                try {
                    future.get();
                }
                catch (Throwable throwable) {
                    FlinkAssertions.assertThatChainOfCauses((Throwable)throwable).anySatisfy(cause -> Assertions.assertThat((Throwable)cause).isInstanceOfAny(new Class[]{ClosedChannelException.class, IllegalStateException.class}));
                }
            }
            ((AbstractLongAssert)Assertions.assertThat((long)clientStats.getNumConnections()).withFailMessage("Connection leak (client)", new Object[0])).isZero();
            for (int i = 0; i < 2; ++i) {
                boolean bl = false;
                int numRetries = 0;
                while (!bl) {
                    try {
                        ((AbstractLongAssert)Assertions.assertThat((long)serverStats[i].getNumConnections()).withFailMessage("Connection leak (server)", new Object[0])).isZero();
                        bl = true;
                    }
                    catch (Throwable t) {
                        if (numRetries < 10) {
                            LOG.info("Retrying connection leak check (server)");
                            Thread.sleep((long)(numRetries + 1) * 50L);
                            ++numRetries;
                            continue;
                        }
                        throw t;
                    }
                }
            }
        }
        finally {
            if (client != null) {
                try {
                    client.shutdown().get();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
                Assertions.assertThat((boolean)client.isEventGroupShutdown()).isTrue();
            }
            for (int i = 0; i < 2; ++i) {
                if (server[i] == null) continue;
                server[i].shutdown();
            }
            if (clientTaskExecutor != null) {
                clientTaskExecutor.shutdown();
            }
        }
    }

    private Channel createServerChannel(final ChannelHandler ... handlers) throws UnknownHostException, InterruptedException {
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().localAddress(InetAddress.getLocalHost(), 0)).group((EventLoopGroup)this.nioGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)}).addLast(handlers);
            }
        });
        return bootstrap.bind().sync().channel();
    }

    private InetSocketAddress getKvStateServerAddress(Channel serverChannel) {
        return (InetSocketAddress)serverChannel.localAddress();
    }

    private static class ChannelDataCollectingHandler
    extends ChannelInboundHandlerAdapter {
        private final AtomicReference<Channel> channel;
        private final LinkedBlockingQueue<ByteBuf> received;

        private ChannelDataCollectingHandler(AtomicReference<Channel> channel, LinkedBlockingQueue<ByteBuf> received) {
            this.channel = channel;
            this.received = received;
        }

        public void channelActive(ChannelHandlerContext ctx) {
            this.channel.set(ctx.channel());
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            this.received.add((ByteBuf)msg);
        }
    }

    @ChannelHandler.Sharable
    private static final class RespondingChannelHandler
    extends ChannelInboundHandlerAdapter {
        private final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
        private final byte[] serializedResult;

        private RespondingChannelHandler(MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer, byte[] serializedResult) {
            this.serializer = serializer;
            this.serializedResult = serializedResult;
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf buf = (ByteBuf)msg;
            Assertions.assertThat((Comparable)MessageSerializer.deserializeHeader((ByteBuf)buf)).isEqualTo((Object)MessageType.REQUEST);
            long requestId = MessageSerializer.getRequestId((ByteBuf)buf);
            KvStateInternalRequest request = (KvStateInternalRequest)this.serializer.deserializeRequest(buf);
            buf.release();
            KvStateResponse response = new KvStateResponse(this.serializedResult);
            ByteBuf serResponse = MessageSerializer.serializeResponse((ByteBufAllocator)ctx.alloc(), (long)requestId, (MessageBody)response);
            ctx.channel().writeAndFlush((Object)serResponse);
        }
    }
}

