/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcChannel;
import com.google.protobuf.RpcController;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.ipc.AbstractTestIPC;
import org.apache.hadoop.hbase.ipc.AsyncRpcClient;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.org.apache.commons.logging.Log;
import org.apache.hive.org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
@Category(value={SmallTests.class})
public class TestAsyncIPC
extends AbstractTestIPC {
    private static final Log LOG = LogFactory.getLog(TestAsyncIPC.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private final boolean useNativeTransport;
    private final boolean useGlobalEventLoopGroup;

    @Parameterized.Parameters
    public static Collection<Object[]> parameters() {
        ArrayList<Object[]> paramList = new ArrayList<Object[]>();
        paramList.add(new Object[]{false, false});
        paramList.add(new Object[]{false, true});
        paramList.add(new Object[]{true, false});
        paramList.add(new Object[]{true, true});
        return paramList;
    }

    public TestAsyncIPC(boolean useNativeTransport, boolean useGlobalEventLoopGroup) {
        this.useNativeTransport = useNativeTransport;
        this.useGlobalEventLoopGroup = useGlobalEventLoopGroup;
    }

    private void setConf(Configuration conf) {
        conf.setBoolean("hbase.rpc.client.nativetransport", this.useNativeTransport);
        conf.setBoolean("hbase.rpc.client.globaleventloopgroup", this.useGlobalEventLoopGroup);
        if (this.useGlobalEventLoopGroup && AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP != null && (this.useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof EpollEventLoopGroup) || !this.useNativeTransport && !(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst() instanceof NioEventLoopGroup))) {
            AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP.getFirst().shutdownGracefully();
            AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP = null;
        }
    }

    @Override
    protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
        this.setConf(conf);
        return new AsyncRpcClient(conf, "default-cluster", null){

            @Override
            Codec getCodec() {
                return null;
            }
        };
    }

    @Override
    protected AsyncRpcClient createRpcClient(Configuration conf) {
        this.setConf(conf);
        return new AsyncRpcClient(conf, "default-cluster", null);
    }

    @Override
    protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
        this.setConf(conf);
        return new AsyncRpcClient(conf, "default-cluster", null, new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addFirst(new ChannelHandler[]{new ChannelOutboundHandlerAdapter(){

                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                        promise.setFailure((Throwable)new RuntimeException("Injected fault"));
                    }
                }});
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncConnectionSetup() throws Exception {
        AbstractTestIPC.TestRpcServer rpcServer = new AbstractTestIPC.TestRpcServer();
        AsyncRpcClient client = this.createRpcClient(CONF);
        try {
            rpcServer.start();
            InetSocketAddress address = rpcServer.getListenerAddress();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            RpcChannel channel = client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), User.getCurrent(), 0);
            final AtomicBoolean done = new AtomicBoolean(false);
            channel.callMethod(md, (RpcController)new PayloadCarryingRpcController(), (Message)param, (Message)md.getOutputType().toProto(), (RpcCallback)new RpcCallback<Message>(){

                public void run(Message parameter) {
                    done.set(true);
                }
            });
            TEST_UTIL.waitFor(1000L, new Waiter.Predicate<Exception>(){

                @Override
                public boolean evaluate() throws Exception {
                    return done.get();
                }
            });
        }
        finally {
            client.close();
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRTEDuringAsyncConnectionSetup() throws Exception {
        AbstractTestIPC.TestRpcServer rpcServer = new AbstractTestIPC.TestRpcServer();
        AsyncRpcClient client = this.createRpcClientRTEDuringConnectionSetup(CONF);
        try {
            rpcServer.start();
            InetSocketAddress address = rpcServer.getListenerAddress();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            RpcChannel channel = client.createRpcChannel(ServerName.valueOf(address.getHostName(), address.getPort(), System.currentTimeMillis()), User.getCurrent(), 0);
            final AtomicBoolean done = new AtomicBoolean(false);
            PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
            controller.notifyOnFail(new RpcCallback<IOException>(){

                public void run(IOException e) {
                    done.set(true);
                    LOG.info("Caught expected exception: " + e.toString());
                    Assert.assertTrue((boolean)StringUtils.stringifyException((Throwable)e).contains("Injected fault"));
                }
            });
            channel.callMethod(md, (RpcController)controller, (Message)param, (Message)md.getOutputType().toProto(), (RpcCallback)new RpcCallback<Message>(){

                public void run(Message parameter) {
                    done.set(true);
                    Assert.fail((String)"Expected an exception to have been thrown!");
                }
            });
            TEST_UTIL.waitFor(1000L, new Waiter.Predicate<Exception>(){

                @Override
                public boolean evaluate() throws Exception {
                    return done.get();
                }
            });
        }
        finally {
            client.close();
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws IOException, SecurityException, NoSuchMethodException, InterruptedException {
        if (args.length != 2) {
            System.out.println("Usage: TestAsyncIPC <CYCLES> <CELLS_PER_CYCLE>");
            return;
        }
        int cycles = Integer.parseInt(args[0]);
        int cellcount = Integer.parseInt(args[1]);
        Configuration conf = HBaseConfiguration.create();
        AbstractTestIPC.TestRpcServer rpcServer = new AbstractTestIPC.TestRpcServer();
        Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
        TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
        AsyncRpcClient client = new AsyncRpcClient(conf, "default-cluster", null);
        KeyValue kv = BIG_CELL;
        Put p = new Put(CellUtil.cloneRow(kv));
        for (int i = 0; i < cellcount; ++i) {
            p.add(kv);
        }
        RowMutations rm = new RowMutations(CellUtil.cloneRow(kv));
        rm.add(p);
        try {
            rpcServer.start();
            InetSocketAddress address = rpcServer.getListenerAddress();
            long startTime = System.currentTimeMillis();
            User user = User.getCurrent();
            for (int i = 0; i < cycles; ++i) {
                ArrayList<CellScannable> cells = new ArrayList<CellScannable>();
                ClientProtos.RegionAction.Builder builder = RequestConverter.buildNoDataRegionAction(HConstants.EMPTY_BYTE_ARRAY, rm, cells, ClientProtos.RegionAction.newBuilder(), ClientProtos.Action.newBuilder(), ClientProtos.MutationProto.newBuilder());
                builder.setRegion(HBaseProtos.RegionSpecifier.newBuilder().setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME).setValue(ByteString.copyFrom((byte[])HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes())));
                if (i % 100000 == 0) {
                    LOG.info("" + i);
                }
                PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
                client.call(pcrc, md, (Message)builder.build(), (Message)param, user, address);
            }
            LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + (System.currentTimeMillis() - startTime) + "ms");
        }
        finally {
            client.close();
            rpcServer.stop();
        }
    }
}

