/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.registry.impl;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapYarnRegistryImpl
implements ServiceRegistry {
    private static final String IPC_SERVICES = "services";
    private static final String IPC_MNG = "llapmng";
    private static final String IPC_SHUFFLE = "shuffle";
    private static final String IPC_LLAP = "llap";
    private static final Logger LOG = LoggerFactory.getLogger(LlapYarnRegistryImpl.class);
    private final RegistryOperationsService client;
    private final Configuration conf;
    private final RegistryUtils.ServiceRecordMarshal encoder;
    private final String path;
    private final DynamicServiceInstanceSet instances = new DynamicServiceInstanceSet();
    private static final UUID uniq = UUID.randomUUID();
    private static final String hostname;
    private static final String UNIQUE_IDENTIFIER = "llap.unique.id";
    private static final String SERVICE_CLASS = "org-apache-hive";
    final ScheduledExecutorService refresher = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LlapYarnRegistryRefresher").build());
    final long refreshDelay;
    private final boolean isDaemon;

    public LlapYarnRegistryImpl(String instanceName, Configuration conf, boolean isDaemon) {
        LOG.info("Llap Registry is enabled with registryid: " + instanceName);
        this.conf = new Configuration(conf);
        conf.addResource("yarn-site.xml");
        this.client = (RegistryOperationsService)RegistryOperationsFactory.createInstance((Configuration)conf);
        this.encoder = new RegistryUtils.ServiceRecordMarshal();
        this.path = RegistryPathUtils.join((String)RegistryUtils.componentPath((String)RegistryUtils.currentUser(), (String)SERVICE_CLASS, (String)instanceName, (String)"workers"), (String)"worker-");
        this.refreshDelay = HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_SERVICE_REFRESH_INTERVAL, TimeUnit.SECONDS);
        this.isDaemon = isDaemon;
        Preconditions.checkArgument(this.refreshDelay > 0L, "Refresh delay for registry has to be positive = %d", this.refreshDelay);
    }

    public Endpoint getRpcEndpoint() {
        int rpcPort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
        return RegistryTypeUtils.ipcEndpoint((String)IPC_LLAP, (InetSocketAddress)new InetSocketAddress(hostname, rpcPort));
    }

    public Endpoint getShuffleEndpoint() {
        int shufflePort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
        return RegistryTypeUtils.inetAddrEndpoint((String)IPC_SHUFFLE, (String)"tcp", (String)hostname, (int)shufflePort);
    }

    public Endpoint getServicesEndpoint() {
        int servicePort = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_PORT);
        boolean isSSL = HiveConf.getBoolVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_WEB_SSL);
        String scheme = isSSL ? "https" : "http";
        try {
            URL serviceURL = new URL(scheme, hostname, servicePort, "");
            return RegistryTypeUtils.webEndpoint((String)IPC_SERVICES, (URI[])new URI[]{serviceURL.toURI()});
        }
        catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
        catch (URISyntaxException e) {
            throw new RuntimeException("llap service URI for " + hostname + " is invalid", e);
        }
    }

    public Endpoint getMngEndpoint() {
        return RegistryTypeUtils.ipcEndpoint((String)IPC_MNG, (InetSocketAddress)new InetSocketAddress(hostname, HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
    }

    private final String getPath() {
        return this.path;
    }

    @Override
    public void register() throws IOException {
        String path = this.getPath();
        ServiceRecord srv = new ServiceRecord();
        srv.addInternalEndpoint(this.getRpcEndpoint());
        srv.addInternalEndpoint(this.getMngEndpoint());
        srv.addInternalEndpoint(this.getShuffleEndpoint());
        srv.addExternalEndpoint(this.getServicesEndpoint());
        for (Map.Entry kv : this.conf) {
            if (!((String)kv.getKey()).startsWith("llap.daemon.") && !((String)kv.getKey()).startsWith("hive.llap.")) continue;
            srv.set((String)kv.getKey(), kv.getValue());
        }
        srv.set(UNIQUE_IDENTIFIER, (Object)uniq.toString());
        this.client.mknode(RegistryPathUtils.parentOf((String)path), true);
        this.client.zkCreate(path, CreateMode.EPHEMERAL_SEQUENTIAL, this.encoder.toBytes((Object)srv), this.client.getClientAcls());
    }

    @Override
    public void unregister() throws IOException {
    }

    @Override
    public ServiceInstanceSet getInstances(String component) throws IOException {
        Preconditions.checkArgument("LLAP".equals(component));
        if (this.client != null) {
            this.instances.refresh();
            return this.instances;
        }
        Preconditions.checkNotNull(this.client, "Yarn registry client is not intialized");
        return null;
    }

    @Override
    public void start() {
        if (this.client == null) {
            return;
        }
        this.client.start();
        if (this.isDaemon) {
            return;
        }
        this.refresher.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    LlapYarnRegistryImpl.this.instances.refresh();
                }
                catch (IOException ioe) {
                    LOG.warn("Could not refresh hosts during scheduled refresh", (Throwable)ioe);
                }
            }
        }, 0L, this.refreshDelay, TimeUnit.SECONDS);
    }

    @Override
    public void stop() {
        if (this.client != null) {
            this.client.stop();
        }
    }

    static {
        String localhost = "localhost";
        try {
            localhost = InetAddress.getLocalHost().getCanonicalHostName();
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        hostname = localhost;
    }

    private class DynamicServiceInstanceSet
    implements ServiceInstanceSet {
        private final Map<String, ServiceInstance> instances = new LinkedHashMap<String, ServiceInstance>();
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        private final ReentrantReadWriteLock.ReadLock readLock = this.lock.readLock();
        private final ReentrantReadWriteLock.WriteLock writeLock = this.lock.writeLock();

        private DynamicServiceInstanceSet() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Map<String, ServiceInstance> getAll() {
            this.readLock.lock();
            try {
                LinkedHashMap<String, ServiceInstance> linkedHashMap = new LinkedHashMap<String, ServiceInstance>(this.instances);
                return linkedHashMap;
            }
            finally {
                this.readLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public List<ServiceInstance> getAllInstancesOrdered() {
            LinkedList<ServiceInstance> list = new LinkedList<ServiceInstance>();
            this.readLock.lock();
            try {
                list.addAll(this.instances.values());
            }
            finally {
                this.readLock.unlock();
            }
            Collections.sort(list, new Comparator<ServiceInstance>(){

                @Override
                public int compare(ServiceInstance o1, ServiceInstance o2) {
                    return o2.getWorkerIdentity().compareTo(o2.getWorkerIdentity());
                }
            });
            return list;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public ServiceInstance getInstance(String name) {
            this.readLock.lock();
            try {
                ServiceInstance serviceInstance = this.instances.get(name);
                return serviceInstance;
            }
            finally {
                this.readLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void refresh() throws IOException {
            HashMap<String, DynamicServiceInstance> freshInstances = new HashMap<String, DynamicServiceInstance>();
            String path = LlapYarnRegistryImpl.this.getPath();
            Map records = RegistryUtils.listServiceRecords((RegistryOperations)LlapYarnRegistryImpl.this.client, (String)RegistryPathUtils.parentOf((String)path));
            this.writeLock.lock();
            try {
                HashSet<String> latestKeys = new HashSet<String>();
                LOG.info("Starting to refresh ServiceInstanceSet " + System.identityHashCode(this));
                for (ServiceRecord rec : records.values()) {
                    DynamicServiceInstance instance = new DynamicServiceInstance(rec);
                    if (instance != null) {
                        if (this.instances != null && !this.instances.containsKey(instance.getWorkerIdentity())) {
                            freshInstances.put(instance.getWorkerIdentity(), instance);
                            if (LOG.isInfoEnabled()) {
                                LOG.info("Adding new worker " + instance.getWorkerIdentity() + " which mapped to " + instance);
                            }
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Retaining running worker " + instance.getWorkerIdentity() + " which mapped to " + instance);
                        }
                    }
                    latestKeys.add(instance.getWorkerIdentity());
                }
                if (this.instances != null) {
                    HashSet<String> oldKeys = new HashSet<String>(this.instances.keySet());
                    if (oldKeys.removeAll(latestKeys)) {
                        for (String k : oldKeys) {
                            DynamicServiceInstance dead = (DynamicServiceInstance)this.instances.get(k);
                            dead.kill();
                            if (!LOG.isInfoEnabled()) continue;
                            LOG.info("Deleting dead worker " + k + " which mapped to " + dead);
                        }
                    }
                    this.instances.keySet().removeAll(oldKeys);
                    this.instances.putAll(freshInstances);
                } else {
                    this.instances.putAll(freshInstances);
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Set<ServiceInstance> getByHost(String host) {
            this.readLock.lock();
            HashSet<ServiceInstance> byHost = new HashSet<ServiceInstance>();
            try {
                for (ServiceInstance i : this.instances.values()) {
                    if (host.equals(i.getHost())) {
                        byHost.add(i);
                    }
                    if (!LOG.isDebugEnabled()) continue;
                    LOG.debug("Locality comparing " + host + " to " + i.getHost());
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Returning " + byHost.size() + " hosts for locality allocation on " + host);
                }
                HashSet<ServiceInstance> hashSet = byHost;
                return hashSet;
            }
            finally {
                this.readLock.unlock();
            }
        }
    }

    private class DynamicServiceInstance
    implements ServiceInstance {
        private final ServiceRecord srv;
        private boolean alive = true;
        private final String host;
        private final int rpcPort;
        private final int mngPort;
        private final int shufflePort;

        public DynamicServiceInstance(ServiceRecord srv) throws IOException {
            this.srv = srv;
            Endpoint shuffle = srv.getInternalEndpoint(LlapYarnRegistryImpl.IPC_SHUFFLE);
            Endpoint rpc = srv.getInternalEndpoint(LlapYarnRegistryImpl.IPC_LLAP);
            Endpoint mng = srv.getInternalEndpoint(LlapYarnRegistryImpl.IPC_MNG);
            this.host = RegistryTypeUtils.getAddressField((Map)((Map)rpc.addresses.get(0)), (String)"host");
            this.rpcPort = Integer.valueOf(RegistryTypeUtils.getAddressField((Map)((Map)rpc.addresses.get(0)), (String)"port"));
            this.mngPort = Integer.valueOf(RegistryTypeUtils.getAddressField((Map)((Map)mng.addresses.get(0)), (String)"port"));
            this.shufflePort = Integer.valueOf(RegistryTypeUtils.getAddressField((Map)((Map)shuffle.addresses.get(0)), (String)"port"));
        }

        @Override
        public String getWorkerIdentity() {
            return this.srv.get(LlapYarnRegistryImpl.UNIQUE_IDENTIFIER);
        }

        @Override
        public String getHost() {
            return this.host;
        }

        @Override
        public int getRpcPort() {
            return this.rpcPort;
        }

        @Override
        public int getShufflePort() {
            return this.shufflePort;
        }

        @Override
        public boolean isAlive() {
            return this.alive;
        }

        public void kill() {
            LOG.info("Killing service instance: " + this);
            this.alive = false;
        }

        @Override
        public Map<String, String> getProperties() {
            return this.srv.attributes();
        }

        @Override
        public Resource getResource() {
            int memory = Integer.valueOf(this.srv.get(HiveConf.ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname));
            int vCores = Integer.valueOf(this.srv.get(HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname));
            return Resource.newInstance((int)memory, (int)vCores);
        }

        public String toString() {
            return "DynamicServiceInstance [alive=" + this.alive + ", host=" + this.host + ":" + this.rpcPort + " with resources=" + this.getResource() + "]";
        }

        @Override
        public int getManagementPort() {
            return this.mngPort;
        }
    }
}

