/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.protobuf.ServiceException;
import drill.shaded.hbase.guava.com.google.common.collect.ImmutableList;
import drill.shaded.hbase.guava.com.google.common.collect.Lists;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.verification.VerificationMode;

public abstract class AbstractTestIPC {
    private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
    private static final byte[] CELL_BYTES = Bytes.toBytes("xyz");
    private static final KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
    static byte[] BIG_CELL_BYTES = new byte[10240];
    static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
    static final Configuration CONF = HBaseConfiguration.create();

    protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoCodec() throws IOException, ServiceException {
        Configuration conf = HBaseConfiguration.create();
        TestRpcServer rpcServer = new TestRpcServer();
        try (AbstractRpcClient<?> client = this.createRpcClientNoCodec(conf);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            HBaseRpcControllerImpl pcrc = new HBaseRpcControllerImpl();
            String message = "hello";
            Assert.assertEquals((Object)message, (Object)stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
            Assert.assertNull((Object)pcrc.cellScanner());
        }
        finally {
            rpcServer.stop();
        }
    }

    protected abstract AbstractRpcClient<?> createRpcClient(Configuration var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCompressCellBlock() throws IOException, ServiceException {
        Configuration conf = new Configuration(HBaseConfiguration.create());
        ArrayList<Cell> cells = new ArrayList<Cell>();
        int count = 3;
        for (int i = 0; i < count; ++i) {
            cells.add(CELL);
        }
        TestRpcServer rpcServer = new TestRpcServer();
        try (AbstractRpcClient<?> client = this.createRpcClient(conf);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            HBaseRpcControllerImpl pcrc = new HBaseRpcControllerImpl(CellUtil.createCellScanner(cells));
            String message = "hello";
            Assert.assertEquals((Object)message, (Object)stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage(message).build()).getMessage());
            int index = 0;
            CellScanner cellScanner = pcrc.cellScanner();
            Assert.assertNotNull((Object)cellScanner);
            while (cellScanner.advance()) {
                Assert.assertEquals((Object)CELL, (Object)cellScanner.current());
                ++index;
            }
            Assert.assertEquals((long)count, (long)index);
        }
        finally {
            rpcServer.stop();
        }
    }

    protected abstract AbstractRpcClient<?> createRpcClientRTEDuringConnectionSetup(Configuration var1) throws IOException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRTEDuringConnectionSetup() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        TestRpcServer rpcServer = new TestRpcServer();
        try (AbstractRpcClient<?> client = this.createRpcClientRTEDuringConnectionSetup(conf);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            stub.ping(null, TestProtos.EmptyRequestProto.getDefaultInstance());
            Assert.fail((String)"Expected an exception to have been thrown!");
        }
        catch (Exception e) {
            LOG.info((Object)("Caught expected exception: " + e.toString()));
            Assert.assertTrue((String)e.toString(), (boolean)StringUtils.stringifyException((Throwable)e).contains("Injected fault"));
        }
        finally {
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRpcScheduler() throws IOException, ServiceException, InterruptedException {
        RpcScheduler scheduler = (RpcScheduler)Mockito.spy((Object)new FifoRpcScheduler(CONF, 1));
        TestRpcServer rpcServer = new TestRpcServer(scheduler, CONF);
        ((RpcScheduler)Mockito.verify((Object)scheduler)).init((RpcScheduler.Context)Matchers.anyObject());
        try (AbstractRpcClient<?> client = this.createRpcClient(CONF);){
            rpcServer.start();
            ((RpcScheduler)Mockito.verify((Object)scheduler)).start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            for (int i = 0; i < 10; ++i) {
                stub.echo(null, param);
            }
            ((RpcScheduler)Mockito.verify((Object)scheduler, (VerificationMode)VerificationModeFactory.times((int)10))).dispatch((CallRunner)Matchers.anyObject());
        }
        finally {
            rpcServer.stop();
            ((RpcScheduler)Mockito.verify((Object)scheduler)).stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRpcMaxRequestSize() throws IOException, ServiceException {
        Configuration conf = new Configuration(CONF);
        conf.setInt("hbase.ipc.max.request.size", 1000);
        TestRpcServer rpcServer = new TestRpcServer(conf);
        try (AbstractRpcClient<?> client = this.createRpcClient(conf);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            StringBuilder message = new StringBuilder(1200);
            for (int i = 0; i < 200; ++i) {
                message.append("hello.");
            }
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage(message.toString()).build();
            stub.echo(new HBaseRpcControllerImpl(CellUtil.createCellScanner(ImmutableList.of(CELL))), param);
            Assert.fail((String)"RPC should have failed because it exceeds max request size");
        }
        catch (ServiceException e) {
            LOG.info((Object)("Caught expected exception: " + (Object)((Object)e)));
            Assert.assertTrue((String)e.toString(), (boolean)StringUtils.stringifyException((Throwable)e).contains("RequestTooBigException"));
        }
        finally {
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRpcServerForNotNullRemoteAddressInCallObject() throws IOException, ServiceException {
        TestRpcServer rpcServer = new TestRpcServer();
        InetSocketAddress localAddr = new InetSocketAddress("localhost", 0);
        try (AbstractRpcClient<?> client = this.createRpcClient(CONF);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            Assert.assertEquals((Object)localAddr.getAddress().getHostAddress(), (Object)stub.addr(null, TestProtos.EmptyRequestProto.getDefaultInstance()).getAddr());
        }
        finally {
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRemoteError() throws IOException, ServiceException {
        TestRpcServer rpcServer = new TestRpcServer();
        try (AbstractRpcClient<?> client = this.createRpcClient(CONF);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            stub.error(null, TestProtos.EmptyRequestProto.getDefaultInstance());
        }
        catch (ServiceException e) {
            LOG.info((Object)("Caught expected exception: " + (Object)((Object)e)));
            IOException ioe = ProtobufUtil.handleRemoteException((Exception)((Object)e));
            Assert.assertTrue((boolean)(ioe instanceof DoNotRetryIOException));
            Assert.assertTrue((boolean)ioe.getMessage().contains("server error!"));
        }
        finally {
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTimeout() throws IOException {
        TestRpcServer rpcServer = new TestRpcServer();
        try (AbstractRpcClient<?> client = this.createRpcClient(CONF);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            HBaseRpcControllerImpl pcrc = new HBaseRpcControllerImpl();
            int ms = 1000;
            int timeout = 100;
            for (int i = 0; i < 10; ++i) {
                pcrc.reset();
                pcrc.setCallTimeout(timeout);
                long startTime = System.nanoTime();
                try {
                    stub.pause(pcrc, TestProtos.PauseRequestProto.newBuilder().setMs(ms).build());
                    continue;
                }
                catch (ServiceException e) {
                    long waitTime = (System.nanoTime() - startTime) / 1000000L;
                    LOG.info((Object)("Caught expected exception: " + (Object)((Object)e)));
                    IOException ioe = ProtobufUtil.handleRemoteException((Exception)((Object)e));
                    Assert.assertTrue((boolean)(ioe.getCause() instanceof CallTimeoutException));
                    Assert.assertTrue((waitTime < (long)ms ? 1 : 0) != 0);
                }
            }
        }
        finally {
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConnectionCloseWithOutstandingRPCs() throws InterruptedException, IOException {
        Configuration conf = new Configuration(CONF);
        TestFailingRpcServer rpcServer = new TestFailingRpcServer(conf);
        try (AbstractRpcClient<?> client = this.createRpcClient(conf);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface stub = TestProtobufRpcServiceImpl.newBlockingStub(client, rpcServer.getListenerAddress());
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            stub.echo(null, param);
            Assert.fail((String)"RPC should have failed because connection closed");
        }
        catch (ServiceException e) {
            LOG.info((Object)("Caught expected exception: " + e.toString()));
        }
        finally {
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncEcho() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        TestRpcServer rpcServer = new TestRpcServer();
        try (AbstractRpcClient<?> client = this.createRpcClient(conf);){
            HBaseRpcController pcrc;
            int i;
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.Interface stub = TestProtobufRpcServiceImpl.newStub(client, rpcServer.getListenerAddress());
            int num = 10;
            ArrayList<HBaseRpcControllerImpl> pcrcList = new ArrayList<HBaseRpcControllerImpl>();
            ArrayList<BlockingRpcCallback<TestProtos.EchoResponseProto>> callbackList = new ArrayList<BlockingRpcCallback<TestProtos.EchoResponseProto>>();
            for (i = 0; i < num; ++i) {
                pcrc = new HBaseRpcControllerImpl();
                BlockingRpcCallback<TestProtos.EchoResponseProto> done = new BlockingRpcCallback<TestProtos.EchoResponseProto>();
                stub.echo(pcrc, TestProtos.EchoRequestProto.newBuilder().setMessage("hello-" + i).build(), done);
                pcrcList.add((HBaseRpcControllerImpl)pcrc);
                callbackList.add(done);
            }
            for (i = 0; i < num; ++i) {
                pcrc = (HBaseRpcController)pcrcList.get(i);
                Assert.assertFalse((boolean)pcrc.failed());
                Assert.assertNull((Object)pcrc.cellScanner());
                Assert.assertEquals((Object)("hello-" + i), (Object)((TestProtos.EchoResponseProto)((BlockingRpcCallback)callbackList.get(i)).get()).getMessage());
            }
        }
        finally {
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncRemoteError() throws IOException {
        AbstractRpcClient<?> client = this.createRpcClient(CONF);
        TestRpcServer rpcServer = new TestRpcServer();
        try {
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.Interface stub = TestProtobufRpcServiceImpl.newStub(client, rpcServer.getListenerAddress());
            BlockingRpcCallback<TestProtos.EmptyResponseProto> callback = new BlockingRpcCallback<TestProtos.EmptyResponseProto>();
            HBaseRpcControllerImpl pcrc = new HBaseRpcControllerImpl();
            stub.error(pcrc, TestProtos.EmptyRequestProto.getDefaultInstance(), callback);
            Assert.assertNull((Object)callback.get());
            Assert.assertTrue((boolean)pcrc.failed());
            LOG.info((Object)("Caught expected exception: " + pcrc.getFailed()));
            IOException ioe = ProtobufUtil.handleRemoteException(pcrc.getFailed());
            Assert.assertTrue((boolean)(ioe instanceof DoNotRetryIOException));
            Assert.assertTrue((boolean)ioe.getMessage().contains("server error!"));
        }
        finally {
            client.close();
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testAsyncTimeout() throws IOException {
        TestRpcServer rpcServer = new TestRpcServer();
        try (AbstractRpcClient<?> client = this.createRpcClient(CONF);){
            rpcServer.start();
            TestRpcServiceProtos.TestProtobufRpcProto.Interface stub = TestProtobufRpcServiceImpl.newStub(client, rpcServer.getListenerAddress());
            ArrayList<HBaseRpcControllerImpl> pcrcList = new ArrayList<HBaseRpcControllerImpl>();
            ArrayList<BlockingRpcCallback<TestProtos.EmptyResponseProto>> callbackList = new ArrayList<BlockingRpcCallback<TestProtos.EmptyResponseProto>>();
            int ms = 1000;
            int timeout = 100;
            long startTime = System.nanoTime();
            for (int i = 0; i < 10; ++i) {
                HBaseRpcControllerImpl hBaseRpcControllerImpl = new HBaseRpcControllerImpl();
                hBaseRpcControllerImpl.setCallTimeout(timeout);
                BlockingRpcCallback<TestProtos.EmptyResponseProto> callback = new BlockingRpcCallback<TestProtos.EmptyResponseProto>();
                stub.pause(hBaseRpcControllerImpl, TestProtos.PauseRequestProto.newBuilder().setMs(ms).build(), callback);
                pcrcList.add(hBaseRpcControllerImpl);
                callbackList.add(callback);
            }
            for (BlockingRpcCallback blockingRpcCallback : callbackList) {
                Assert.assertNull(blockingRpcCallback.get());
            }
            long waitTime = (System.nanoTime() - startTime) / 1000000L;
            for (HBaseRpcController hBaseRpcController : pcrcList) {
                Assert.assertTrue((boolean)hBaseRpcController.failed());
                LOG.info((Object)("Caught expected exception: " + hBaseRpcController.getFailed()));
                IOException ioe = ProtobufUtil.handleRemoteException(hBaseRpcController.getFailed());
                Assert.assertTrue((boolean)(ioe.getCause() instanceof CallTimeoutException));
            }
            Assert.assertTrue((waitTime < (long)ms ? 1 : 0) != 0);
        }
        finally {
            rpcServer.stop();
        }
    }

    static class TestFailingRpcServer
    extends TestRpcServer {
        TestFailingRpcServer() throws IOException {
            this(new FifoRpcScheduler(CONF, 1), CONF);
        }

        TestFailingRpcServer(Configuration conf) throws IOException {
            this(new FifoRpcScheduler(conf, 1), conf);
        }

        TestFailingRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
            super(scheduler, conf);
        }

        @Override
        protected RpcServer.Connection getConnection(SocketChannel channel, long time) {
            return new FailingConnection(channel, time);
        }

        class FailingConnection
        extends RpcServer.Connection {
            public FailingConnection(SocketChannel channel, long lastContact) {
                super(channel, lastContact);
            }

            @Override
            protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException {
                throw new DoNotRetryIOException("Failing for test");
            }
        }
    }

    static class TestRpcServer
    extends RpcServer {
        TestRpcServer() throws IOException {
            this(new FifoRpcScheduler(CONF, 1), CONF);
        }

        TestRpcServer(Configuration conf) throws IOException {
            this(new FifoRpcScheduler(conf, 1), conf);
        }

        TestRpcServer(RpcScheduler scheduler, Configuration conf) throws IOException {
            super(null, "testRpcServer", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(TestProtobufRpcServiceImpl.SERVICE, null)), new InetSocketAddress("localhost", 0), conf, scheduler);
        }
    }
}

