/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.ipc;

import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ClientCache;
import org.apache.hadoop.ipc.ProtocolProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcEngine;
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl;
import org.apache.hadoop.yarn.ipc.RpcProtos;

@InterfaceStability.Evolving
public class ProtoOverHadoopRpcEngine
implements RpcEngine {
    private static final Log LOG = LogFactory.getLog(RPC.class);
    private static final ClientCache CLIENTS = new ClientCache();

    public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
        return new ProtocolProxy(protocol, Proxy.newProxyInstance(protocol.getClassLoader(), new Class[]{protocol}, (InvocationHandler)((Object)new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout))), false);
    }

    public void stopProxy(Object proxy) {
        try {
            ((Invoker)((Object)Proxy.getInvocationHandler(proxy))).close();
        }
        catch (IOException e) {
            LOG.warn((Object)("Error while stopping " + proxy), (Throwable)e);
        }
    }

    public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, UserGroupInformation ticket, Configuration conf) throws IOException, InterruptedException {
        throw new UnsupportedOperationException();
    }

    @InterfaceAudience.Private
    @InterfaceStability.Unstable
    static Client getClient(Configuration conf) {
        return CLIENTS.getClient(conf, SocketFactory.getDefault(), ProtoSpecificResponseWritable.class);
    }

    private static void log(String value) {
        if (value != null && value.length() > 55) {
            value = value.substring(0, 55) + "...";
        }
        LOG.info((Object)value);
    }

    public RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException {
        return new Server(instance, conf, bindAddress, port, numHandlers, numReaders, queueSizePerHandler, verbose, secretManager, portRangeConfig);
    }

    public static class Server
    extends RPC.Server {
        private BlockingService service;
        private boolean verbose;

        private static String classNameBase(String className) {
            String[] names = className.split("\\.", -1);
            if (names == null || names.length == 0) {
                return className;
            }
            return names[names.length - 1];
        }

        public Server(Object instance, Configuration conf, String bindAddress, int port, int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose, SecretManager<? extends TokenIdentifier> secretManager, String portRangeConfig) throws IOException {
            super(bindAddress, port, ProtoSpecificRequestWritable.class, numHandlers, numReaders, queueSizePerHandler, conf, Server.classNameBase(instance.getClass().getName()), secretManager, portRangeConfig);
            this.service = (BlockingService)instance;
            this.verbose = verbose;
        }

        public Writable call(Class<?> protocol, Writable writableRequest, long receiveTime) throws IOException {
            Message result;
            Descriptors.MethodDescriptor methodDescriptor;
            ProtoSpecificRequestWritable request = (ProtoSpecificRequestWritable)writableRequest;
            RpcProtos.ProtoSpecificRpcRequest rpcRequest = request.message;
            String methodName = rpcRequest.getMethodName();
            if (this.verbose) {
                ProtoOverHadoopRpcEngine.log("Call: protocol=" + protocol.getCanonicalName() + ", method=" + methodName);
            }
            if ((methodDescriptor = this.service.getDescriptorForType().findMethodByName(methodName)) == null) {
                String msg = "Unknown method " + methodName + " called on " + protocol + " protocol.";
                LOG.warn((Object)msg);
                return this.handleException(new IOException(msg));
            }
            Message prototype = this.service.getRequestPrototype(methodDescriptor);
            Message param = prototype.newBuilderForType().mergeFrom(rpcRequest.getRequestProto()).build();
            try {
                long startTime = System.currentTimeMillis();
                result = this.service.callBlockingMethod(methodDescriptor, null, param);
                int processingTime = (int)(System.currentTimeMillis() - startTime);
                int qTime = (int)(startTime - receiveTime);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Served: " + methodName + " queueTime= " + qTime + " procesingTime= " + processingTime));
                }
                this.rpcMetrics.addRpcQueueTime(qTime);
                this.rpcMetrics.addRpcProcessingTime(processingTime);
                this.rpcDetailedMetrics.addProcessingTime(methodName, processingTime);
            }
            catch (ServiceException e) {
                e.printStackTrace();
                return this.handleException(e);
            }
            catch (Exception e) {
                return this.handleException(e);
            }
            RpcProtos.ProtoSpecificRpcResponse response = this.constructProtoSpecificRpcSuccessResponse(result);
            return new ProtoSpecificResponseWritable(response);
        }

        private ProtoSpecificResponseWritable handleException(Throwable e) {
            RpcProtos.ProtoSpecificRpcResponse.Builder builder = RpcProtos.ProtoSpecificRpcResponse.newBuilder();
            builder.setIsError(true);
            if (e.getCause() instanceof YarnRemoteExceptionPBImpl) {
                builder.setException(((YarnRemoteExceptionPBImpl)e.getCause()).getProto());
            } else {
                builder.setException(new YarnRemoteExceptionPBImpl(e).getProto());
            }
            RpcProtos.ProtoSpecificRpcResponse response = builder.build();
            return new ProtoSpecificResponseWritable(response);
        }

        private RpcProtos.ProtoSpecificRpcResponse constructProtoSpecificRpcSuccessResponse(Message message) {
            RpcProtos.ProtoSpecificRpcResponse res = RpcProtos.ProtoSpecificRpcResponse.newBuilder().setResponseProto(message.toByteString()).build();
            return res;
        }
    }

    public static class ProtoSpecificResponseWritable
    implements Writable {
        RpcProtos.ProtoSpecificRpcResponse message;

        public ProtoSpecificResponseWritable() {
        }

        public ProtoSpecificResponseWritable(RpcProtos.ProtoSpecificRpcResponse message) {
            this.message = message;
        }

        public void write(DataOutput out) throws IOException {
            out.writeInt(this.message.toByteArray().length);
            out.write(this.message.toByteArray());
        }

        public void readFields(DataInput in) throws IOException {
            int length = in.readInt();
            byte[] bytes = new byte[length];
            in.readFully(bytes);
            this.message = RpcProtos.ProtoSpecificRpcResponse.parseFrom(bytes);
        }
    }

    private static class ProtoSpecificRequestWritable
    implements Writable {
        RpcProtos.ProtoSpecificRpcRequest message;

        public ProtoSpecificRequestWritable() {
        }

        ProtoSpecificRequestWritable(RpcProtos.ProtoSpecificRpcRequest message) {
            this.message = message;
        }

        public void write(DataOutput out) throws IOException {
            out.writeInt(this.message.toByteArray().length);
            out.write(this.message.toByteArray());
        }

        public void readFields(DataInput in) throws IOException {
            int length = in.readInt();
            byte[] bytes = new byte[length];
            in.readFully(bytes);
            this.message = RpcProtos.ProtoSpecificRpcRequest.parseFrom(bytes);
        }
    }

    private static class Invoker
    implements RpcInvocationHandler,
    Closeable {
        private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
        private boolean isClosed = false;
        private Client.ConnectionId remoteId;
        private Client client;

        public Invoker(Class<?> protocol, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {
            this.remoteId = Client.ConnectionId.getConnectionId((InetSocketAddress)addr, protocol, (UserGroupInformation)ticket, (int)rpcTimeout, (Configuration)conf);
            this.client = CLIENTS.getClient(conf, factory, ProtoSpecificResponseWritable.class);
        }

        private RpcProtos.ProtoSpecificRpcRequest constructRpcRequest(Method method, Object[] params) throws ServiceException {
            RpcProtos.ProtoSpecificRpcRequest.Builder builder = RpcProtos.ProtoSpecificRpcRequest.newBuilder();
            builder.setMethodName(method.getName());
            if (params.length != 2) {
                throw new ServiceException("Too many parameters for request. Method: [" + method.getName() + "]" + ", Expected: 2, Actual: " + params.length);
            }
            if (params[1] == null) {
                throw new ServiceException("null param while calling Method: [" + method.getName() + "]");
            }
            Message param = (Message)params[1];
            builder.setRequestProto(param.toByteString());
            RpcProtos.ProtoSpecificRpcRequest rpcRequest = builder.build();
            return rpcRequest;
        }

        public Client.ConnectionId getConnectionId() {
            return this.remoteId;
        }

        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            long startTime = 0L;
            if (LOG.isDebugEnabled()) {
                startTime = System.currentTimeMillis();
            }
            RpcProtos.ProtoSpecificRpcRequest rpcRequest = this.constructRpcRequest(method, args);
            ProtoSpecificResponseWritable val = null;
            try {
                val = (ProtoSpecificResponseWritable)this.client.call((Writable)new ProtoSpecificRequestWritable(rpcRequest), this.remoteId);
            }
            catch (Exception e) {
                throw new ServiceException((Throwable)e);
            }
            RpcProtos.ProtoSpecificRpcResponse response = val.message;
            if (LOG.isDebugEnabled()) {
                long callTime = System.currentTimeMillis() - startTime;
                LOG.debug((Object)("Call: " + method.getName() + " " + callTime));
            }
            if (response.hasIsError() && response.getIsError()) {
                YarnRemoteExceptionPBImpl exception = new YarnRemoteExceptionPBImpl(response.getException());
                exception.fillInStackTrace();
                ServiceException se = new ServiceException((Throwable)exception);
                throw se;
            }
            Message prototype = null;
            try {
                prototype = this.getReturnProtoType(method);
            }
            catch (Exception e) {
                throw new ServiceException((Throwable)e);
            }
            Message actualReturnMessage = prototype.newBuilderForType().mergeFrom(response.getResponseProto()).build();
            return actualReturnMessage;
        }

        @Override
        public void close() throws IOException {
            if (!this.isClosed) {
                this.isClosed = true;
                CLIENTS.stopClient(this.client);
            }
        }

        private Message getReturnProtoType(Method method) throws Exception {
            if (this.returnTypes.containsKey(method.getName())) {
                return this.returnTypes.get(method.getName());
            }
            Class<?> returnType = method.getReturnType();
            Method newInstMethod = returnType.getMethod("getDefaultInstance", new Class[0]);
            newInstMethod.setAccessible(true);
            Message prototype = (Message)newInstMethod.invoke(null, (Object[])null);
            this.returnTypes.put(method.getName(), prototype);
            return prototype;
        }
    }
}

