/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.io.IOException;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.spi.NullContext;
import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.Service;

public class TestRPC
extends TestCase {
    private static final String ADDRESS = "0.0.0.0";
    public static final Log LOG = LogFactory.getLog(TestRPC.class);
    private static Configuration conf = new Configuration();
    int datasize = 102400;
    int numThreads = 50;
    private static final String ACL_CONFIG = "test.protocol.acl";

    public TestRPC(String name) {
        super(name);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testSlowRpc() throws Exception {
        System.out.println("Testing Slow RPC");
        RPC.Server server = RPC.getServer((Object)new TestImpl(), (String)ADDRESS, (int)0, (int)2, (boolean)false, (Configuration)conf);
        TestProtocol proxy = null;
        try {
            server.start();
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            SlowRPC slowrpc = new SlowRPC(proxy);
            Thread thread = new Thread((Runnable)slowrpc, "SlowRPC");
            thread.start();
            TestRPC.assertTrue((String)"Slow RPC should not have finished1.", (!slowrpc.isDone() ? 1 : 0) != 0);
            proxy.slowPing(false);
            TestRPC.assertTrue((String)"Slow RPC should not have finished2.", (!slowrpc.isDone() ? 1 : 0) != 0);
            proxy.slowPing(false);
            while (!slowrpc.isDone()) {
                System.out.println("Waiting for slow RPC to get done.");
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {}
            }
        }
        catch (Throwable throwable) {
            server.stop();
            if (proxy != null) {
                RPC.stopProxy(proxy);
            }
            System.out.println("Down slow rpc testing");
            throw throwable;
        }
        server.stop();
        if (proxy != null) {
            RPC.stopProxy((VersionedProtocol)proxy);
        }
        System.out.println("Down slow rpc testing");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testCalls() throws Exception {
        RPC.Server server = RPC.getServer((Object)new TestImpl(), (String)ADDRESS, (int)0, (Configuration)conf);
        TestProtocol proxy = null;
        try {
            int i;
            server.start();
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            proxy.ping();
            String stringResult = proxy.echo("foo");
            TestRPC.assertEquals((String)stringResult, (String)"foo");
            stringResult = proxy.echo((String)null);
            TestRPC.assertEquals((String)stringResult, null);
            server.rpcMetrics.doUpdates((MetricsContext)new NullContext());
            TestRPC.assertEquals((int)4, (int)server.rpcMetrics.rpcProcessingTime.getPreviousIntervalNumOps());
            TestRPC.assertTrue((server.rpcMetrics.sentBytes.getPreviousIntervalValue() > 0L ? 1 : 0) != 0);
            TestRPC.assertTrue((server.rpcMetrics.receivedBytes.getPreviousIntervalValue() > 0L ? 1 : 0) != 0);
            server.rpcDetailedMetrics.doUpdates((MetricsContext)new NullContext());
            MetricsTimeVaryingRate metrics = (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("echo");
            TestRPC.assertEquals((int)2, (int)metrics.getPreviousIntervalNumOps());
            metrics = (MetricsTimeVaryingRate)server.rpcDetailedMetrics.registry.get("ping");
            TestRPC.assertEquals((int)1, (int)metrics.getPreviousIntervalNumOps());
            Object[] stringResults = proxy.echo(new String[]{"foo", "bar"});
            TestRPC.assertTrue((boolean)Arrays.equals(stringResults, new String[]{"foo", "bar"}));
            stringResults = proxy.echo((String[])null);
            TestRPC.assertTrue((boolean)Arrays.equals(stringResults, null));
            UTF8 utf8Result = (UTF8)proxy.echo((Writable)new UTF8("hello world"));
            TestRPC.assertEquals((Object)utf8Result, (Object)new UTF8("hello world"));
            utf8Result = (UTF8)proxy.echo((Writable)((UTF8)null));
            TestRPC.assertEquals((Object)utf8Result, null);
            int intResult = proxy.add(1, 2);
            TestRPC.assertEquals((int)intResult, (int)3);
            intResult = proxy.add(new int[]{1, 2});
            TestRPC.assertEquals((int)intResult, (int)3);
            boolean caught = false;
            try {
                proxy.error();
            }
            catch (IOException e) {
                LOG.debug((Object)("Caught " + e));
                caught = true;
            }
            TestRPC.assertTrue((boolean)caught);
            proxy.testServerGet();
            System.out.println("Starting multi-threaded RPC test...");
            server.setSocketSendBufSize(1024);
            Thread[] threadId = new Thread[this.numThreads];
            for (i = 0; i < this.numThreads; ++i) {
                Transactions trans = new Transactions(proxy, this.datasize);
                threadId[i] = new Thread((Runnable)trans, "TransactionThread-" + i);
                threadId[i].start();
            }
            System.out.println("Waiting for all threads to finish RPCs...");
            for (i = 0; i < this.numThreads; ++i) {
                try {
                    threadId[i].join();
                    continue;
                }
                catch (InterruptedException e) {
                    --i;
                }
            }
            Method echo = TestProtocol.class.getMethod("echo", String.class);
            Object[] strings = (String[])RPC.call((Method)echo, (Object[][])new String[][]{{"a"}, {"b"}}, (InetSocketAddress[])new InetSocketAddress[]{addr, addr}, (Configuration)conf);
            TestRPC.assertTrue((boolean)Arrays.equals(strings, new String[]{"a", "b"}));
            Method ping = TestProtocol.class.getMethod("ping", new Class[0]);
            Object[] voids = RPC.call((Method)ping, (Object[][])new Object[][]{new Object[0], new Object[0]}, (InetSocketAddress[])new InetSocketAddress[]{addr, addr}, (Configuration)conf);
            TestRPC.assertEquals((Object)voids, null);
        }
        catch (Throwable throwable) {
            server.stop();
            if (proxy != null) {
                RPC.stopProxy(proxy);
            }
            throw throwable;
        }
        server.stop();
        if (proxy != null) {
            RPC.stopProxy((VersionedProtocol)proxy);
        }
    }

    public void testStandaloneClient() throws IOException {
        try {
            RPC.waitForProxy(TestProtocol.class, (long)1L, (InetSocketAddress)new InetSocketAddress(ADDRESS, 20), (Configuration)conf, (long)15000L);
            TestRPC.fail((String)"We should not have reached here");
        }
        catch (ConnectException connectException) {
            // empty catch block
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doRPCs(Configuration conf, boolean expectFailure) throws Exception {
        block10: {
            RPC.Server server = RPC.getServer((Object)new TestImpl(), (String)ADDRESS, (int)0, (int)5, (boolean)true, (Configuration)conf);
            server.refreshServiceAcl(conf, (PolicyProvider)new TestPolicyProvider());
            TestProtocol proxy = null;
            server.start();
            InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
            try {
                proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
                proxy.ping();
                if (expectFailure) {
                    TestRPC.fail((String)"Expect RPC.getProxy to fail with AuthorizationException!");
                }
            }
            catch (RemoteException e) {
                if (expectFailure) {
                    TestRPC.assertTrue((boolean)(e.unwrapRemoteException() instanceof AuthorizationException));
                    break block10;
                }
                throw e;
            }
            finally {
                server.stop();
                if (proxy != null) {
                    RPC.stopProxy((VersionedProtocol)proxy);
                }
                if (expectFailure) {
                    TestRPC.assertEquals((String)"Wrong number of authorizationFailures ", (int)1, (int)server.getRpcMetrics().authorizationFailures.getCurrentIntervalValue());
                } else {
                    TestRPC.assertEquals((String)"Wrong number of authorizationSuccesses ", (int)1, (int)server.getRpcMetrics().authorizationSuccesses.getCurrentIntervalValue());
                }
                TestRPC.assertEquals((String)"Wrong number of authenticationFailures ", (int)0, (int)server.getRpcMetrics().authenticationFailures.getCurrentIntervalValue());
                TestRPC.assertEquals((String)"Wrong number of authenticationSuccesses ", (int)0, (int)server.getRpcMetrics().authenticationSuccesses.getCurrentIntervalValue());
            }
        }
    }

    public void testAuthorization() throws Exception {
        Configuration conf = new Configuration();
        conf.setBoolean("hadoop.security.authorization", true);
        conf.set(ACL_CONFIG, "*");
        this.doRPCs(conf, false);
        conf.set(ACL_CONFIG, "invalid invalid");
        this.doRPCs(conf, true);
    }

    public void testRPCInterrupted3() throws IOException, InterruptedException {
        Configuration conf = new Configuration();
        RPC.Server server = RPC.getServer((Object)new TestImpl(), (String)ADDRESS, (int)0, (int)5, (boolean)true, (Configuration)conf);
        server.start();
        int numConcurrentRPC = 200;
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        final CyclicBarrier barrier = new CyclicBarrier(numConcurrentRPC);
        final CountDownLatch latch = new CountDownLatch(numConcurrentRPC);
        final AtomicBoolean leaderRunning = new AtomicBoolean(true);
        final AtomicReference<Object> error = new AtomicReference<Object>(null);
        Thread leaderThread = null;
        for (int i = 0; i < numConcurrentRPC; ++i) {
            final int num = i;
            final TestProtocol proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
            Thread rpcThread = new Thread(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        barrier.await();
                        while (num == 0 || leaderRunning.get()) {
                            proxy.slowPing(false);
                        }
                        proxy.slowPing(false);
                    }
                    catch (Exception e) {
                        if (num == 0) {
                            leaderRunning.set(false);
                        } else {
                            error.set(e);
                        }
                        LOG.error((Object)e);
                    }
                    finally {
                        latch.countDown();
                    }
                }
            });
            rpcThread.start();
            if (leaderThread != null) continue;
            leaderThread = rpcThread;
        }
        Thread.sleep(1000L);
        while (leaderRunning.get()) {
            leaderThread.interrupt();
        }
        latch.await();
        TestRPC.assertNull((String)"rpc got ClosedChannelException", error.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testErrorMsgForInsecureClient() throws Exception {
        RPC.Server server = RPC.getServer((Object)new TestImpl(), (String)ADDRESS, (int)0, (int)5, (boolean)true, (Configuration)conf, null);
        server.enableSecurity();
        server.start();
        boolean succeeded = false;
        InetSocketAddress addr = NetUtils.getConnectAddress((Server)server);
        TestProtocol proxy = null;
        try {
            proxy = (TestProtocol)RPC.getProxy(TestProtocol.class, (long)1L, (InetSocketAddress)addr, (Configuration)conf);
        }
        catch (RemoteException e) {
            LOG.info((Object)("LOGGING MESSAGE: " + e.getLocalizedMessage()));
            TestRPC.assertTrue((boolean)(e.unwrapRemoteException() instanceof AccessControlException));
            succeeded = true;
        }
        finally {
            server.stop();
            if (proxy != null) {
                RPC.stopProxy((VersionedProtocol)proxy);
            }
        }
        TestRPC.assertTrue((boolean)succeeded);
    }

    public static void main(String[] args) throws Exception {
        new TestRPC("test").testCalls();
    }

    private static class TestPolicyProvider
    extends PolicyProvider {
        private TestPolicyProvider() {
        }

        public Service[] getServices() {
            return new Service[]{new Service(TestRPC.ACL_CONFIG, TestProtocol.class)};
        }
    }

    static class SlowRPC
    implements Runnable {
        private TestProtocol proxy;
        private volatile boolean done;

        SlowRPC(TestProtocol proxy) {
            this.proxy = proxy;
            this.done = false;
        }

        boolean isDone() {
            return this.done;
        }

        @Override
        public void run() {
            try {
                this.proxy.slowPing(true);
                this.done = true;
            }
            catch (IOException e) {
                Assert.assertTrue((String)("SlowRPC ping exception " + e), (boolean)false);
            }
        }
    }

    static class Transactions
    implements Runnable {
        int datasize;
        TestProtocol proxy;

        Transactions(TestProtocol proxy, int datasize) {
            this.proxy = proxy;
            this.datasize = datasize;
        }

        @Override
        public void run() {
            int[] indata = new int[this.datasize];
            int[] outdata = null;
            int val = 0;
            try {
                outdata = this.proxy.exchange(indata);
                val = this.proxy.add(1, 2);
            }
            catch (IOException e) {
                Assert.assertTrue((String)("Exception from RPC exchange() " + e), (boolean)false);
            }
            Assert.assertEquals((int)indata.length, (int)outdata.length);
            Assert.assertEquals((int)val, (int)3);
            for (int i = 0; i < outdata.length; ++i) {
                Assert.assertEquals((int)outdata[i], (int)i);
            }
        }
    }

    public static class TestImpl
    implements TestProtocol {
        int fastPingCounter = 0;

        public long getProtocolVersion(String protocol, long clientVersion) {
            return 1L;
        }

        @Override
        public void ping() {
        }

        @Override
        public synchronized void slowPing(boolean shouldSlow) {
            if (shouldSlow) {
                while (this.fastPingCounter < 2) {
                    try {
                        this.wait();
                    }
                    catch (InterruptedException interruptedException) {}
                }
                this.fastPingCounter -= 2;
            } else {
                ++this.fastPingCounter;
                this.notify();
            }
        }

        @Override
        public String echo(String value) throws IOException {
            return value;
        }

        @Override
        public String[] echo(String[] values) throws IOException {
            return values;
        }

        @Override
        public Writable echo(Writable writable) {
            return writable;
        }

        @Override
        public int add(int v1, int v2) {
            return v1 + v2;
        }

        @Override
        public int add(int[] values) {
            int sum = 0;
            for (int i = 0; i < values.length; ++i) {
                sum += values[i];
            }
            return sum;
        }

        @Override
        public int error() throws IOException {
            throw new IOException("bobo");
        }

        @Override
        public void testServerGet() throws IOException {
            if (!(Server.get() instanceof RPC.Server)) {
                throw new IOException("Server.get() failed");
            }
        }

        @Override
        public int[] exchange(int[] values) {
            for (int i = 0; i < values.length; ++i) {
                values[i] = i;
            }
            return values;
        }
    }

    public static interface TestProtocol
    extends VersionedProtocol {
        public static final long versionID = 1L;

        public void ping() throws IOException;

        public void slowPing(boolean var1) throws IOException;

        public String echo(String var1) throws IOException;

        public String[] echo(String[] var1) throws IOException;

        public Writable echo(Writable var1) throws IOException;

        public int add(int var1, int var2) throws IOException;

        public int add(int[] var1) throws IOException;

        public int error() throws IOException;

        public void testServerGet() throws IOException;

        public int[] exchange(int[] var1) throws IOException;

        public String toString();
    }
}

