/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client;

import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider;
import org.apache.hadoop.yarn.client.RMProxy;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestHedgingRMFailoverProxyProvider<T>
extends ConfiguredRMFailoverProxyProvider<T> {
    private static final Logger LOG = LoggerFactory.getLogger(RequestHedgingRMFailoverProxyProvider.class);
    private volatile String successfulProxy = null;
    private FailoverProxyProvider.ProxyInfo<T> wrappedProxy = null;
    private Map<String, T> nonRetriableProxy = new HashMap<String, T>();

    @Override
    public void init(Configuration configuration, RMProxy<T> rmProxy, Class<T> protocol) {
        super.init(configuration, rmProxy, protocol);
        HashMap retriableProxies = new HashMap();
        String originalId = HAUtil.getRMHAId((Configuration)this.conf);
        for (String rmId : this.rmServiceIds) {
            this.conf.set("yarn.resourcemanager.ha.id", rmId);
            this.nonRetriableProxy.put(rmId, super.getProxyInternal());
            T proxy = this.createRetriableProxy();
            FailoverProxyProvider.ProxyInfo pInfo = new FailoverProxyProvider.ProxyInfo(proxy, rmId);
            retriableProxies.put(rmId, pInfo);
        }
        this.conf.set("yarn.resourcemanager.ha.id", originalId);
        Object proxyInstance = Proxy.newProxyInstance(RMRequestHedgingInvocationHandler.class.getClassLoader(), new Class[]{protocol}, (InvocationHandler)new RMRequestHedgingInvocationHandler(retriableProxies));
        String combinedInfo = Arrays.toString(this.rmServiceIds);
        this.wrappedProxy = new FailoverProxyProvider.ProxyInfo(proxyInstance, combinedInfo);
        LOG.info("Created wrapped proxy for " + combinedInfo);
    }

    protected T createRetriableProxy() {
        try {
            RetryPolicy retryPolicy = RMProxy.createRetryPolicy((Configuration)this.conf, false);
            InetSocketAddress rmAddress = this.rmProxy.getRMAddress(this.conf, this.protocol);
            Object proxy = this.rmProxy.getProxy((Configuration)this.conf, this.protocol, rmAddress);
            return (T)RetryProxy.create((Class)this.protocol, proxy, (RetryPolicy)retryPolicy);
        }
        catch (IOException ioe) {
            LOG.error("Unable to create proxy to the ResourceManager " + HAUtil.getRMHAId((Configuration)this.conf), (Throwable)ioe);
            return null;
        }
    }

    @Override
    public FailoverProxyProvider.ProxyInfo<T> getProxy() {
        return this.wrappedProxy;
    }

    @Override
    public void performFailover(T currentProxy) {
        LOG.info("Connection lost, trying to fail over.");
        this.successfulProxy = null;
    }

    class RMRequestHedgingInvocationHandler
    implements InvocationHandler {
        private final Map<String, FailoverProxyProvider.ProxyInfo<T>> allProxies;

        public RMRequestHedgingInvocationHandler(Map<String, FailoverProxyProvider.ProxyInfo<T>> allProxies) {
            this.allProxies = new HashMap(allProxies);
        }

        protected Object invokeMethod(Object proxy, Method method, Object[] args) throws Throwable {
            try {
                return method.invoke(proxy, args);
            }
            catch (InvocationTargetException ex) {
                throw ex.getCause();
            }
        }

        private Throwable extraRootException(Exception ex) {
            Throwable cause;
            Throwable rootCause = ex;
            if (ex instanceof ExecutionException && (cause = ex.getCause()) instanceof InvocationTargetException) {
                rootCause = cause.getCause();
            }
            return rootCause;
        }

        @Override
        public Object invoke(Object proxy, final Method method, final Object[] args) throws Throwable {
            if (RequestHedgingRMFailoverProxyProvider.this.successfulProxy != null) {
                return this.invokeMethod(RequestHedgingRMFailoverProxyProvider.this.nonRetriableProxy.get(RequestHedgingRMFailoverProxyProvider.this.successfulProxy), method, args);
            }
            ExecutorService executor = null;
            try {
                Object pInfo2;
                HashMap proxyMap = new HashMap();
                executor = HadoopExecutors.newFixedThreadPool((int)this.allProxies.size());
                ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(executor);
                for (final Object pInfo2 : this.allProxies.values()) {
                    Callable<Object> c = new Callable<Object>(){

                        @Override
                        public Object call() throws Exception {
                            return method.invoke(pInfo2.proxy, args);
                        }
                    };
                    proxyMap.put(completionService.submit(c), (FailoverProxyProvider.ProxyInfo)pInfo2);
                }
                Future callResultFuture = completionService.take();
                RequestHedgingRMFailoverProxyProvider.this.successfulProxy = pInfo2 = ((FailoverProxyProvider.ProxyInfo)proxyMap.get(callResultFuture)).proxyInfo;
                try {
                    Object retVal = callResultFuture.get();
                    LOG.info("Invocation successful on [" + pInfo2 + "]");
                    Object v = retVal;
                    return v;
                }
                catch (Exception ex) {
                    Throwable rootCause = this.extraRootException(ex);
                    LOG.warn("Invocation returned exception: " + rootCause.toString() + " on [" + pInfo2 + "], so propagating back to caller.");
                    throw rootCause;
                }
            }
            finally {
                if (executor != null) {
                    executor.shutdownNow();
                }
            }
        }
    }
}

