/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.BlockingService;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestDelayedRpcProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hive.com.google.common.collect.Lists;
import org.apache.hive.org.apache.commons.logging.Log;
import org.apache.hive.org.apache.commons.logging.LogFactory;
import org.apache.hive.org.apache.log4j.Appender;
import org.apache.hive.org.apache.log4j.AppenderSkeleton;
import org.apache.hive.org.apache.log4j.Level;
import org.apache.hive.org.apache.log4j.Logger;
import org.apache.hive.org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class})
public class TestDelayedRpc {
    private static final Log LOG = LogFactory.getLog(TestDelayedRpc.class);
    public static RpcServerInterface rpcServer;
    public static final int UNDELAYED = 0;
    public static final int DELAYED = 1;
    private static final int RPC_CLIENT_TIMEOUT = 30000;

    @Test(timeout=60000L)
    public void testDelayedRpcImmediateReturnValue() throws Exception {
        this.testDelayedRpc(false);
    }

    @Test(timeout=60000L)
    public void testDelayedRpcDelayedReturnValue() throws Exception {
        this.testDelayedRpc(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDelayedRpc(boolean delayReturnValue) throws Exception {
        LOG.info("Running testDelayedRpc delayReturnValue=" + delayReturnValue);
        Configuration conf = HBaseConfiguration.create();
        InetSocketAddress isa = new InetSocketAddress("localhost", 0);
        TestDelayedImplementation instance = new TestDelayedImplementation(delayReturnValue);
        BlockingService service = TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
        rpcServer = new RpcServer(null, "testDelayedRpc", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa, conf, new FifoRpcScheduler(conf, 1));
        rpcServer.start();
        try (RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());){
            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(), 30000);
            TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
            ArrayList<Integer> results = new ArrayList<Integer>();
            TestThread th1 = new TestThread(stub, true, results);
            TestThread th2 = new TestThread(stub, false, results);
            TestThread th3 = new TestThread(stub, false, results);
            th1.start();
            Thread.sleep(100L);
            th2.start();
            Thread.sleep(200L);
            th3.start();
            th1.join();
            th2.join();
            th3.join();
            Assert.assertEquals((long)0L, (long)((Integer)results.get(0)).intValue());
            Assert.assertEquals((long)0L, (long)((Integer)results.get(1)).intValue());
            Assert.assertEquals((long)((Integer)results.get(2)).intValue(), (long)(delayReturnValue ? 1L : -559038737L));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testTooManyDelayedRpcs() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int MAX_DELAYED_RPC = 10;
        conf.setInt("hbase.ipc.warn.delayedrpc.number", 10);
        ListAppender listAppender = new ListAppender();
        Logger log = Logger.getLogger(RpcServer.class);
        log.addAppender((Appender)((Object)listAppender));
        log.setLevel(Level.WARN);
        InetSocketAddress isa = new InetSocketAddress("localhost", 0);
        TestDelayedImplementation instance = new TestDelayedImplementation(true);
        BlockingService service = TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
        rpcServer = new RpcServer(null, "testTooManyDelayedRpcs", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa, conf, new FifoRpcScheduler(conf, 1));
        rpcServer.start();
        try (RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());){
            int i;
            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(), 30000);
            TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
            Thread[] threads = new Thread[11];
            for (i = 0; i < 10; ++i) {
                threads[i] = new TestThread(stub, true, null);
                threads[i].start();
            }
            Assert.assertTrue((boolean)listAppender.getMessages().isEmpty());
            threads[10] = new TestThread(stub, true, null);
            threads[10].start();
            for (i = 0; i < 10; ++i) {
                threads[i].join();
            }
            Assert.assertFalse((boolean)listAppender.getMessages().isEmpty());
            Assert.assertTrue((boolean)listAppender.getMessages().get(0).startsWith("Too many delayed calls"));
            log.removeAppender((Appender)((Object)listAppender));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEndDelayThrowing() throws IOException {
        Configuration conf = HBaseConfiguration.create();
        InetSocketAddress isa = new InetSocketAddress("localhost", 0);
        FaultyTestDelayedImplementation instance = new FaultyTestDelayedImplementation();
        BlockingService service = TestDelayedRpcProtos.TestDelayedService.newReflectiveBlockingService(instance);
        rpcServer = new RpcServer(null, "testEndDelayThrowing", Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), isa, conf, new FifoRpcScheduler(conf, 1));
        rpcServer.start();
        try (RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());){
            BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), rpcServer.getListenerAddress().getPort(), System.currentTimeMillis()), User.getCurrent(), 1000);
            TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub = TestDelayedRpcProtos.TestDelayedService.newBlockingStub(channel);
            int result = -559038737;
            try {
                result = stub.test(null, TestDelayedRpcProtos.TestArg.newBuilder().setDelay(false).build()).getResponse();
            }
            catch (Exception e) {
                Assert.fail((String)"No exception should have been thrown.");
            }
            Assert.assertEquals((long)result, (long)0L);
            boolean caughtException = false;
            try {
                result = stub.test(null, TestDelayedRpcProtos.TestArg.newBuilder().setDelay(true).build()).getResponse();
            }
            catch (Exception e) {
                if (e.getCause().getMessage().contains("java.lang.Exception: Something went wrong")) {
                    caughtException = true;
                }
                LOG.warn("Caught exception, expected=" + caughtException);
            }
            Assert.assertTrue((boolean)caughtException);
        }
    }

    private static class FaultyTestDelayedImplementation
    extends TestDelayedImplementation {
        public FaultyTestDelayedImplementation() {
            super(false);
        }

        @Override
        public TestDelayedRpcProtos.TestResponse test(RpcController rpcController, TestDelayedRpcProtos.TestArg arg) throws ServiceException {
            LOG.info("In faulty test, delay=" + arg.getDelay());
            if (!arg.getDelay()) {
                return TestDelayedRpcProtos.TestResponse.newBuilder().setResponse(0).build();
            }
            RpcCallContext call = RpcServer.getCurrentCall();
            call.startDelay(true);
            LOG.info("In faulty test, delaying");
            try {
                call.endDelayThrowing(new Exception("Something went wrong"));
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            return TestDelayedRpcProtos.TestResponse.newBuilder().setResponse(1).build();
        }
    }

    public static class TestThread
    extends Thread {
        private final TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub;
        private final boolean delay;
        private final List<Integer> results;

        public TestThread(TestDelayedRpcProtos.TestDelayedService.BlockingInterface stub, boolean delay, List<Integer> results) {
            this.stub = stub;
            this.delay = delay;
            this.results = results;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Integer result;
            try {
                result = new Integer(this.stub.test(null, TestDelayedRpcProtos.TestArg.newBuilder().setDelay(this.delay).build()).getResponse());
            }
            catch (ServiceException e) {
                throw new RuntimeException(e);
            }
            if (this.results != null) {
                List<Integer> list = this.results;
                synchronized (list) {
                    this.results.add(result);
                }
            }
        }
    }

    public static class TestDelayedImplementation
    implements TestDelayedRpcProtos.TestDelayedService.BlockingInterface {
        private final boolean delayReturnValue;

        public TestDelayedImplementation(boolean delayReturnValue) {
            this.delayReturnValue = delayReturnValue;
        }

        @Override
        public TestDelayedRpcProtos.TestResponse test(RpcController rpcController, TestDelayedRpcProtos.TestArg testArg) throws ServiceException {
            boolean delay = testArg.getDelay();
            TestDelayedRpcProtos.TestResponse.Builder responseBuilder = TestDelayedRpcProtos.TestResponse.newBuilder();
            if (!delay) {
                responseBuilder.setResponse(0);
                return responseBuilder.build();
            }
            final RpcCallContext call = RpcServer.getCurrentCall();
            call.startDelay(this.delayReturnValue);
            new Thread(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(500L);
                        TestDelayedRpcProtos.TestResponse.Builder responseBuilder = TestDelayedRpcProtos.TestResponse.newBuilder();
                        call.endDelay(TestDelayedImplementation.this.delayReturnValue ? responseBuilder.setResponse(1).build() : null);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }.start();
            responseBuilder.setResponse(-559038737);
            return responseBuilder.build();
        }
    }

    private static class ListAppender
    extends AppenderSkeleton {
        private final List<String> messages = new ArrayList<String>();

        private ListAppender() {
        }

        protected void append(LoggingEvent event) {
            this.messages.add(event.getMessage().toString());
        }

        public void close() {
        }

        public boolean requiresLayout() {
            return false;
        }

        public List<String> getMessages() {
            return this.messages;
        }
    }
}

