/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.network;

import java.nio.channels.ClosedChannelException;
import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
import org.apache.flink.queryablestate.messages.KvStateResponse;
import org.apache.flink.queryablestate.network.ClientHandler;
import org.apache.flink.queryablestate.network.ClientHandlerCallback;
import org.apache.flink.queryablestate.network.messages.MessageBody;
import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
import org.apache.flink.queryablestate.network.messages.MessageSerializer;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class KvStateClientHandlerTest {
    KvStateClientHandlerTest() {
    }

    @Test
    void testReadCallbacksAndBufferRecycling() throws Exception {
        TestingClientHandlerCallback callback = new TestingClientHandlerCallback();
        MessageSerializer serializer = new MessageSerializer((MessageDeserializer)new KvStateInternalRequest.KvStateInternalRequestDeserializer(), (MessageDeserializer)new KvStateResponse.KvStateResponseDeserializer());
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{new ClientHandler("Test Client", serializer, (ClientHandlerCallback)callback)});
        byte[] content = new byte[]{};
        KvStateResponse response = new KvStateResponse(content);
        ByteBuf buf = MessageSerializer.serializeResponse((ByteBufAllocator)channel.alloc(), (long)1222112277L, (MessageBody)response);
        buf.skipBytes(4);
        callback.reset();
        channel.writeInbound(new Object[]{buf});
        Assertions.assertThat((int)callback.onRequestCnt).isEqualTo(1);
        Assertions.assertThat((long)callback.onRequestId).isEqualTo(1222112277L);
        Assertions.assertThat((Object)callback.onRequestBody).isInstanceOf(KvStateResponse.class);
        Assertions.assertThat((int)buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled", new Object[0]);
        buf = MessageSerializer.serializeRequestFailure((ByteBufAllocator)channel.alloc(), (long)1222112278L, (Throwable)new RuntimeException("Expected test Exception"));
        buf.skipBytes(4);
        callback.reset();
        channel.writeInbound(new Object[]{buf});
        Assertions.assertThat((int)callback.onRequestFailureCnt).isEqualTo(1);
        Assertions.assertThat((long)callback.onRequestFailureId).isEqualTo(1222112278L);
        Assertions.assertThat((Throwable)callback.onRequestFailureBody).isInstanceOf(RuntimeException.class);
        Assertions.assertThat((int)buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled", new Object[0]);
        buf = MessageSerializer.serializeServerFailure((ByteBufAllocator)channel.alloc(), (Throwable)new RuntimeException("Expected test Exception"));
        buf.skipBytes(4);
        callback.reset();
        channel.writeInbound(new Object[]{buf});
        Assertions.assertThat((int)callback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat((Throwable)callback.onFailureBody).isInstanceOf(RuntimeException.class);
        buf = channel.alloc().buffer(4).writeInt(1223823);
        callback.reset();
        channel.writeInbound(new Object[]{buf});
        Assertions.assertThat((int)callback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat((Throwable)callback.onFailureBody).isInstanceOf(RuntimeException.class);
        Assertions.assertThat((int)buf.refCnt()).isEqualTo(0).withFailMessage("Buffer not recycled", new Object[0]);
        callback.reset();
        channel.pipeline().fireExceptionCaught((Throwable)new RuntimeException("Expected test Exception"));
        Assertions.assertThat((int)callback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat((Throwable)callback.onFailureBody).isInstanceOf(RuntimeException.class);
        callback.reset();
        channel.pipeline().fireChannelInactive();
        Assertions.assertThat((int)callback.onFailureCnt).isEqualTo(1);
        Assertions.assertThat((Throwable)callback.onFailureBody).isInstanceOf(ClosedChannelException.class);
    }

    private static class TestingClientHandlerCallback
    implements ClientHandlerCallback {
        private int onRequestCnt;
        private long onRequestId;
        private MessageBody onRequestBody;
        private int onRequestFailureCnt;
        private long onRequestFailureId;
        private Throwable onRequestFailureBody;
        private int onFailureCnt;
        private Throwable onFailureBody;

        private TestingClientHandlerCallback() {
        }

        public void onRequestResult(long requestId, MessageBody response) {
            ++this.onRequestCnt;
            this.onRequestId = requestId;
            this.onRequestBody = response;
        }

        public void onRequestFailure(long requestId, Throwable cause) {
            ++this.onRequestFailureCnt;
            this.onRequestFailureId = requestId;
            this.onRequestFailureBody = cause;
        }

        public void onFailure(Throwable cause) {
            ++this.onFailureCnt;
            this.onFailureBody = cause;
        }

        public void reset() {
            this.onRequestCnt = 0;
            this.onRequestId = -1L;
            this.onRequestBody = null;
            this.onRequestFailureCnt = 0;
            this.onRequestFailureId = -1L;
            this.onRequestFailureBody = null;
            this.onFailureCnt = 0;
            this.onFailureBody = null;
        }
    }
}

