/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.queryablestate.network.AbstractServerBase;
import org.apache.flink.queryablestate.network.AbstractServerHandler;
import org.apache.flink.queryablestate.network.Client;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.queryablestate.network.stats.AtomicKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class AbstractServerTest {
    AbstractServerTest() {
    }

    @Test
    void testServerInitializationFailure() throws Throwable {
        List<Integer> portList = Collections.singletonList(0);
        try (TestServer server1 = new TestServer("Test Server 1", (KvStateRequestStats)new DisabledKvStateRequestStats(), portList.iterator());){
            server1.start();
            try (TestServer server2 = new TestServer("Test Server 2", (KvStateRequestStats)new DisabledKvStateRequestStats(), Collections.singletonList(server1.getServerAddress().getPort()).iterator());){
                Assertions.assertThatThrownBy(() -> server2.start()).hasMessage("Unable to start Test Server 2. All ports in provided range are occupied.");
            }
        }
    }

    @Test
    void testPortRangeSuccess() throws Throwable {
        AtomicKvStateRequestStats serverStats1 = new AtomicKvStateRequestStats();
        AtomicKvStateRequestStats serverStats2 = new AtomicKvStateRequestStats();
        AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
        int portRangeStart = 7777;
        int portRangeEnd = 7900;
        List portList = IntStream.range(7777, 7901).boxed().collect(Collectors.toList());
        try (TestServer server1 = new TestServer("Test Server 1", (KvStateRequestStats)serverStats1, portList.iterator());
             TestServer server2 = new TestServer("Test Server 2", (KvStateRequestStats)serverStats2, portList.iterator());
             TestClient client = new TestClient("Test Client", 1, (MessageSerializer<TestMessage, TestMessage>)new MessageSerializer((MessageDeserializer)new TestMessage.TestMessageDeserializer(), (MessageDeserializer)new TestMessage.TestMessageDeserializer()), (KvStateRequestStats)clientStats);){
            server1.start();
            Assertions.assertThat((int)server1.getServerAddress().getPort()).isGreaterThanOrEqualTo(7777);
            Assertions.assertThat((int)server1.getServerAddress().getPort()).isLessThanOrEqualTo(7900);
            server2.start();
            Assertions.assertThat((int)server2.getServerAddress().getPort()).isGreaterThanOrEqualTo(7777);
            Assertions.assertThat((int)server2.getServerAddress().getPort()).isLessThanOrEqualTo(7900);
            TestMessage response1 = (TestMessage)((Object)client.sendRequest(server1.getServerAddress(), new TestMessage("ping")).join());
            Assertions.assertThat((String)response1.getMessage()).isEqualTo(server1.getServerName() + "-ping");
            TestMessage response2 = (TestMessage)((Object)client.sendRequest(server2.getServerAddress(), new TestMessage("pong")).join());
            Assertions.assertThat((String)response2.getMessage()).isEqualTo(server2.getServerName() + "-pong");
            Assertions.assertThat((long)serverStats1.getNumConnections()).isEqualTo(1L);
            Assertions.assertThat((long)serverStats2.getNumConnections()).isEqualTo(1L);
            Assertions.assertThat((long)clientStats.getNumConnections()).isEqualTo(2L);
            Assertions.assertThat((long)clientStats.getNumFailed()).isEqualTo(0L);
            Assertions.assertThat((long)clientStats.getNumSuccessful()).isEqualTo(2L);
            Assertions.assertThat((long)clientStats.getNumRequests()).isEqualTo(2L);
        }
        Assertions.assertThat((long)serverStats1.getNumConnections()).isEqualTo(0L);
        Assertions.assertThat((long)serverStats2.getNumConnections()).isEqualTo(0L);
        Assertions.assertThat((long)clientStats.getNumConnections()).isEqualTo(0L);
        Assertions.assertThat((long)clientStats.getNumFailed()).isEqualTo(0L);
        Assertions.assertThat((long)clientStats.getNumSuccessful()).isEqualTo(2L);
        Assertions.assertThat((long)clientStats.getNumRequests()).isEqualTo(2L);
    }

    private static class TestServer
    extends AbstractServerBase<TestMessage, TestMessage>
    implements AutoCloseable {
        private final KvStateRequestStats requestStats;

        TestServer(String name, KvStateRequestStats stats, Iterator<Integer> bindPort) throws UnknownHostException {
            super(name, InetAddress.getLocalHost().getHostName(), bindPort, Integer.valueOf(1), Integer.valueOf(1));
            this.requestStats = stats;
        }

        public AbstractServerHandler<TestMessage, TestMessage> initializeHandler() {
            return new AbstractServerHandler<TestMessage, TestMessage>((AbstractServerBase)this, new MessageSerializer((MessageDeserializer)new TestMessage.TestMessageDeserializer(), (MessageDeserializer)new TestMessage.TestMessageDeserializer()), this.requestStats){

                public CompletableFuture<TestMessage> handleRequest(long requestId, TestMessage request) {
                    TestMessage response = new TestMessage(this.getServerName() + "-" + request.getMessage());
                    return CompletableFuture.completedFuture(response);
                }

                public CompletableFuture<Void> shutdown() {
                    return CompletableFuture.completedFuture(null);
                }
            };
        }

        @Override
        public void close() throws Exception {
            this.shutdownServer().get();
            Assertions.assertThat((boolean)this.getQueryExecutor().isTerminated()).isTrue();
            Assertions.assertThat((boolean)this.isEventGroupShutdown()).isTrue();
        }
    }

    private static class TestClient
    extends Client<TestMessage, TestMessage>
    implements AutoCloseable {
        TestClient(String clientName, int numEventLoopThreads, MessageSerializer<TestMessage, TestMessage> serializer, KvStateRequestStats stats) {
            super(clientName, numEventLoopThreads, serializer, stats);
        }

        @Override
        public void close() throws Exception {
            this.shutdown().join();
            Assertions.assertThat((boolean)this.isEventGroupShutdown()).isTrue();
        }
    }

    private static class TestMessage
    extends MessageBody {
        private final String message;

        TestMessage(String message) {
            this.message = (String)Preconditions.checkNotNull((Object)message);
        }

        public String getMessage() {
            return this.message;
        }

        public byte[] serialize() {
            byte[] content = this.message.getBytes(ConfigConstants.DEFAULT_CHARSET);
            return ByteBuffer.allocate(content.length + 4).putInt(content.length).put(content).array();
        }

        public static class TestMessageDeserializer
        implements MessageDeserializer<TestMessage> {
            public TestMessage deserializeMessage(ByteBuf buf) {
                int length = buf.readInt();
                String message = "";
                if (length > 0) {
                    byte[] name = new byte[length];
                    buf.readBytes(name);
                    message = new String(name, ConfigConstants.DEFAULT_CHARSET);
                }
                return new TestMessage(message);
            }
        }
    }
}

