/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.netty.channel.epoll;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hive.druid.io.netty.bootstrap.Bootstrap;
import org.apache.hive.druid.io.netty.buffer.ByteBuf;
import org.apache.hive.druid.io.netty.channel.AdaptiveRecvByteBufAllocator;
import org.apache.hive.druid.io.netty.channel.Channel;
import org.apache.hive.druid.io.netty.channel.ChannelFuture;
import org.apache.hive.druid.io.netty.channel.ChannelHandler;
import org.apache.hive.druid.io.netty.channel.ChannelHandlerContext;
import org.apache.hive.druid.io.netty.channel.ChannelOption;
import org.apache.hive.druid.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hive.druid.io.netty.channel.epoll.EpollChannelOption;
import org.apache.hive.druid.io.netty.channel.epoll.EpollSocketTestPermutation;
import org.apache.hive.druid.io.netty.channel.epoll.Native;
import org.apache.hive.druid.io.netty.channel.socket.DatagramPacket;
import org.apache.hive.druid.io.netty.testsuite.transport.AbstractComboTestsuiteTest;
import org.apache.hive.druid.io.netty.testsuite.transport.TestsuitePermutation;
import org.apache.hive.druid.io.netty.testsuite.transport.socket.AbstractDatagramTest;
import org.apache.hive.druid.io.netty.util.internal.PlatformDependent;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

public class EpollDatagramScatteringReadTest
extends AbstractDatagramTest {
    @BeforeAll
    public static void assumeRecvmmsgSupported() {
        Assumptions.assumeTrue((boolean)Native.IS_SUPPORTING_RECVMMSG);
    }

    protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
        return EpollSocketTestPermutation.INSTANCE.epollOnlyDatagram(this.internetProtocolFamily());
    }

    @Test
    public void testScatteringReadPartial(TestInfo testInfo) throws Throwable {
        this.run(testInfo, (AbstractComboTestsuiteTest.Runner)new AbstractComboTestsuiteTest.Runner<Bootstrap, Bootstrap>(){

            public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
                EpollDatagramScatteringReadTest.this.testScatteringReadPartial(bootstrap, bootstrap2);
            }
        });
    }

    public void testScatteringReadPartial(Bootstrap sb, Bootstrap cb) throws Throwable {
        this.testScatteringRead(sb, cb, false, true);
    }

    @Test
    public void testScatteringRead(TestInfo testInfo) throws Throwable {
        this.run(testInfo, (AbstractComboTestsuiteTest.Runner)new AbstractComboTestsuiteTest.Runner<Bootstrap, Bootstrap>(){

            public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
                EpollDatagramScatteringReadTest.this.testScatteringRead(bootstrap, bootstrap2);
            }
        });
    }

    public void testScatteringRead(Bootstrap sb, Bootstrap cb) throws Throwable {
        this.testScatteringRead(sb, cb, false, false);
    }

    @Test
    public void testScatteringReadConnectedPartial(TestInfo testInfo) throws Throwable {
        this.run(testInfo, (AbstractComboTestsuiteTest.Runner)new AbstractComboTestsuiteTest.Runner<Bootstrap, Bootstrap>(){

            public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
                EpollDatagramScatteringReadTest.this.testScatteringReadConnectedPartial(bootstrap, bootstrap2);
            }
        });
    }

    public void testScatteringReadConnectedPartial(Bootstrap sb, Bootstrap cb) throws Throwable {
        this.testScatteringRead(sb, cb, true, true);
    }

    @Test
    public void testScatteringConnectedRead(TestInfo testInfo) throws Throwable {
        this.run(testInfo, (AbstractComboTestsuiteTest.Runner)new AbstractComboTestsuiteTest.Runner<Bootstrap, Bootstrap>(){

            public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
                EpollDatagramScatteringReadTest.this.testScatteringConnectedRead(bootstrap, bootstrap2);
            }
        });
    }

    public void testScatteringConnectedRead(Bootstrap sb, Bootstrap cb) throws Throwable {
        this.testScatteringRead(sb, cb, true, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testScatteringRead(Bootstrap sb, Bootstrap cb, boolean connected, boolean partial) throws Throwable {
        int packetSize = 8;
        int numPackets = 4;
        sb.option(ChannelOption.RCVBUF_ALLOCATOR, (Object)new AdaptiveRecvByteBufAllocator(packetSize, packetSize * (partial ? numPackets / 2 : numPackets), 65536));
        sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, (Object)(packetSize * 2));
        Channel sc = null;
        Channel cc = null;
        try {
            cb.handler((ChannelHandler)new SimpleChannelInboundHandler<Object>(){

                public void channelRead0(ChannelHandlerContext ctx, Object msgs) throws Exception {
                }
            });
            cc = cb.bind(this.newSocketAddress()).sync().channel();
            final SocketAddress ccAddress = cc.localAddress();
            final AtomicReference errorRef = new AtomicReference();
            final CountDownLatch latch = new CountDownLatch(numPackets);
            sb.handler((ChannelHandler)new SimpleChannelInboundHandler<DatagramPacket>(){
                private long numRead;
                private int counter;

                public void channelReadComplete(ChannelHandlerContext ctx) {
                    Assertions.assertTrue((this.counter > 1 ? 1 : 0) != 0);
                    this.counter = 0;
                    ctx.read();
                }

                protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
                    Assertions.assertEquals((Object)ccAddress, (Object)msg.sender());
                    Assertions.assertEquals((int)8, (int)((ByteBuf)msg.content()).readableBytes());
                    Assertions.assertEquals((long)this.numRead, (long)((ByteBuf)msg.content()).readLong());
                    ++this.numRead;
                    ++this.counter;
                    latch.countDown();
                }

                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                    errorRef.compareAndSet(null, cause);
                }
            });
            sb.option(ChannelOption.AUTO_READ, (Object)false);
            sc = sb.bind(this.newSocketAddress()).sync().channel();
            if (connected) {
                sc.connect(cc.localAddress()).syncUninterruptibly();
            }
            InetSocketAddress addr = (InetSocketAddress)sc.localAddress();
            ArrayList<ChannelFuture> futures = new ArrayList<ChannelFuture>(numPackets);
            for (int i = 0; i < numPackets; ++i) {
                futures.add(cc.write((Object)new DatagramPacket(cc.alloc().directBuffer().writeLong((long)i), addr)));
            }
            cc.flush();
            for (ChannelFuture f : futures) {
                f.sync();
            }
            sc.config().setAutoRead(true);
            if (!latch.await(10L, TimeUnit.SECONDS)) {
                Throwable error = (Throwable)errorRef.get();
                if (error != null) {
                    throw error;
                }
                Assertions.fail((String)"Timeout while waiting for packets");
            }
        }
        finally {
            if (cc != null) {
                cc.close().syncUninterruptibly();
            }
            if (sc != null) {
                sc.close().syncUninterruptibly();
            }
        }
    }

    @Test
    public void testScatteringReadWithSmallBuffer(TestInfo testInfo) throws Throwable {
        this.run(testInfo, (AbstractComboTestsuiteTest.Runner)new AbstractComboTestsuiteTest.Runner<Bootstrap, Bootstrap>(){

            public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
                EpollDatagramScatteringReadTest.this.testScatteringReadWithSmallBuffer(bootstrap, bootstrap2);
            }
        });
    }

    public void testScatteringReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
        this.testScatteringReadWithSmallBuffer0(sb, cb, false);
    }

    @Test
    public void testScatteringConnectedReadWithSmallBuffer(TestInfo testInfo) throws Throwable {
        this.run(testInfo, (AbstractComboTestsuiteTest.Runner)new AbstractComboTestsuiteTest.Runner<Bootstrap, Bootstrap>(){

            public void run(Bootstrap bootstrap, Bootstrap bootstrap2) throws Throwable {
                EpollDatagramScatteringReadTest.this.testScatteringConnectedReadWithSmallBuffer(bootstrap, bootstrap2);
            }
        });
    }

    public void testScatteringConnectedReadWithSmallBuffer(Bootstrap sb, Bootstrap cb) throws Throwable {
        this.testScatteringReadWithSmallBuffer0(sb, cb, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testScatteringReadWithSmallBuffer0(Bootstrap sb, Bootstrap cb, boolean connected) throws Throwable {
        int packetSize = 16;
        sb.option(ChannelOption.RCVBUF_ALLOCATOR, (Object)new AdaptiveRecvByteBufAllocator(1400, 1400, 65536));
        sb.option(EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE, (Object)1400);
        Channel sc = null;
        Channel cc = null;
        try {
            cb.handler((ChannelHandler)new SimpleChannelInboundHandler<Object>(){

                public void channelRead0(ChannelHandlerContext ctx, Object msgs) {
                }
            });
            cc = cb.bind(this.newSocketAddress()).sync().channel();
            final SocketAddress ccAddress = cc.localAddress();
            final AtomicReference errorRef = new AtomicReference();
            final byte[] bytes = new byte[packetSize];
            PlatformDependent.threadLocalRandom().nextBytes(bytes);
            final CountDownLatch latch = new CountDownLatch(1);
            sb.handler((ChannelHandler)new SimpleChannelInboundHandler<DatagramPacket>(){

                protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
                    Assertions.assertEquals((Object)ccAddress, (Object)msg.sender());
                    Assertions.assertEquals((int)bytes.length, (int)((ByteBuf)msg.content()).readableBytes());
                    byte[] receivedBytes = new byte[bytes.length];
                    ((ByteBuf)msg.content()).readBytes(receivedBytes);
                    Assertions.assertArrayEquals((byte[])bytes, (byte[])receivedBytes);
                    latch.countDown();
                }

                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                    errorRef.compareAndSet(null, cause);
                }
            });
            sc = sb.bind(this.newSocketAddress()).sync().channel();
            if (connected) {
                sc.connect(cc.localAddress()).syncUninterruptibly();
            }
            InetSocketAddress addr = (InetSocketAddress)sc.localAddress();
            cc.writeAndFlush((Object)new DatagramPacket(cc.alloc().directBuffer().writeBytes(bytes), addr)).sync();
            if (!latch.await(10L, TimeUnit.SECONDS)) {
                Throwable error = (Throwable)errorRef.get();
                if (error != null) {
                    throw error;
                }
                Assertions.fail((String)"Timeout while waiting for packets");
            }
        }
        finally {
            if (cc != null) {
                cc.close().syncUninterruptibly();
            }
            if (sc != null) {
                sc.close().syncUninterruptibly();
            }
        }
    }
}

