/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.curator.discovery;

import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.utils.ZKPaths;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.io.druid.concurrent.LifecycleLock;
import org.apache.hive.druid.io.druid.discovery.DiscoveryDruidNode;
import org.apache.hive.druid.io.druid.discovery.DruidNodeDiscovery;
import org.apache.hive.druid.io.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.hive.druid.io.druid.guice.ManageLifecycle;
import org.apache.hive.druid.io.druid.guice.annotations.Json;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.StringUtils;
import org.apache.hive.druid.io.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.hive.druid.io.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.hive.druid.io.druid.java.util.common.logger.Logger;
import org.apache.hive.druid.io.druid.server.initialization.ZkPathsConfig;

@ManageLifecycle
public class CuratorDruidNodeDiscoveryProvider
extends DruidNodeDiscoveryProvider {
    private static final Logger log = new Logger(CuratorDruidNodeDiscoveryProvider.class);
    private final CuratorFramework curatorFramework;
    private final ZkPathsConfig config;
    private final ObjectMapper jsonMapper;
    private ExecutorService listenerExecutor;
    private final Map<String, NodeTypeWatcher> nodeTypeWatchers = new ConcurrentHashMap<String, NodeTypeWatcher>();
    private final LifecycleLock lifecycleLock = new LifecycleLock();

    @Inject
    public CuratorDruidNodeDiscoveryProvider(CuratorFramework curatorFramework, ZkPathsConfig config, @Json ObjectMapper jsonMapper) {
        this.curatorFramework = curatorFramework;
        this.config = config;
        this.jsonMapper = jsonMapper;
    }

    @Override
    public DruidNodeDiscovery getForNodeType(String nodeType) {
        Preconditions.checkState(this.lifecycleLock.awaitStarted(1L, TimeUnit.MILLISECONDS));
        return this.nodeTypeWatchers.compute(nodeType, (k, v) -> {
            if (v != null) {
                return v;
            }
            log.info("Creating NodeTypeWatcher for nodeType [%s].", nodeType);
            NodeTypeWatcher nodeTypeWatcher = new NodeTypeWatcher(this.listenerExecutor, this.curatorFramework, this.config.getInternalDiscoveryPath(), this.jsonMapper, nodeType);
            nodeTypeWatcher.start();
            log.info("Created NodeTypeWatcher for nodeType [%s].", nodeType);
            return nodeTypeWatcher;
        });
    }

    @LifecycleStart
    public void start() {
        if (!this.lifecycleLock.canStart()) {
            throw new ISE("can't start.", new Object[0]);
        }
        try {
            log.info("starting", new Object[0]);
            this.listenerExecutor = Execs.singleThreaded("CuratorDruidNodeDiscoveryProvider-ListenerExecutor");
            log.info("started", new Object[0]);
            this.lifecycleLock.started();
        }
        finally {
            this.lifecycleLock.exitStart();
        }
    }

    @LifecycleStop
    public void stop() {
        if (!this.lifecycleLock.canStop()) {
            throw new ISE("can't stop.", new Object[0]);
        }
        log.info("stopping", new Object[0]);
        for (NodeTypeWatcher watcher : this.nodeTypeWatchers.values()) {
            watcher.stop();
        }
        this.listenerExecutor.shutdownNow();
        log.info("stopped", new Object[0]);
    }

    private static class NodeTypeWatcher
    implements DruidNodeDiscovery {
        private static final Logger log = new Logger(NodeTypeWatcher.class);
        private final CuratorFramework curatorFramework;
        private final String nodeType;
        private final ObjectMapper jsonMapper;
        private final Map<String, DiscoveryDruidNode> nodes = new ConcurrentHashMap<String, DiscoveryDruidNode>();
        private final PathChildrenCache cache;
        private final ExecutorService cacheExecutor;
        private final ExecutorService listenerExecutor;
        private final List<DruidNodeDiscovery.Listener> nodeListeners = new ArrayList<DruidNodeDiscovery.Listener>();
        private final Object lock = new Object();
        private CountDownLatch cacheInitialized = new CountDownLatch(1);

        NodeTypeWatcher(ExecutorService listenerExecutor, CuratorFramework curatorFramework, String basePath, ObjectMapper jsonMapper, String nodeType) {
            this.listenerExecutor = listenerExecutor;
            this.curatorFramework = curatorFramework;
            this.nodeType = nodeType;
            this.jsonMapper = jsonMapper;
            this.cacheExecutor = Execs.singleThreaded(StringUtils.format("NodeTypeWatcher[%s]", nodeType));
            this.cache = new PathChildrenCache(curatorFramework, ZKPaths.makePath((String)basePath, (String)nodeType), true, true, this.cacheExecutor);
        }

        @Override
        public Collection<DiscoveryDruidNode> getAllNodes() {
            if (!this.isCacheInitialized(30L, TimeUnit.SECONDS)) {
                log.info("cache is not initialized yet. getAllNodes() might not return full information.", new Object[0]);
            }
            return Collections.unmodifiableCollection(this.nodes.values());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void registerListener(DruidNodeDiscovery.Listener listener) {
            Object object = this.lock;
            synchronized (object) {
                if (this.isCacheInitialized(1L, TimeUnit.MICROSECONDS)) {
                    ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(this.nodes.values());
                    this.safeSchedule(() -> listener.nodesAdded(currNodes), "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, listener);
                }
                this.nodeListeners.add(listener);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleChildEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            Object object = this.lock;
            synchronized (object) {
                try {
                    switch (event.getType()) {
                        case CHILD_ADDED: {
                            byte[] data;
                            try {
                                data = (byte[])((GetDataWatchBackgroundStatable)this.curatorFramework.getData().decompressed()).forPath(event.getData().getPath());
                            }
                            catch (Exception ex) {
                                log.error(ex, "Failed to get data for path [%s]. Ignoring event [%s].", event.getData().getPath(), event.getType());
                                return;
                            }
                            DiscoveryDruidNode druidNode = this.jsonMapper.readValue(data, DiscoveryDruidNode.class);
                            if (!this.nodeType.equals(druidNode.getNodeType())) {
                                log.warn("Node[%s:%s] add is discovered by node watcher of nodeType [%s]. Ignored.", druidNode.getNodeType(), druidNode, this.nodeType);
                                return;
                            }
                            log.info("Received event [%s] for Node[%s:%s].", event.getType(), druidNode.getNodeType(), druidNode);
                            this.addNode(druidNode);
                            break;
                        }
                        case CHILD_REMOVED: {
                            DiscoveryDruidNode druidNode = this.jsonMapper.readValue(event.getData().getData(), DiscoveryDruidNode.class);
                            if (!this.nodeType.equals(druidNode.getNodeType())) {
                                log.warn("Node[%s:%s] removal is discovered by node watcher of nodeType [%s]. Ignored.", druidNode.getNodeType(), druidNode, this.nodeType);
                                return;
                            }
                            log.info("Node[%s:%s] disappeared.", druidNode.getNodeType(), druidNode);
                            this.removeNode(druidNode);
                            break;
                        }
                        case INITIALIZED: {
                            if (this.isCacheInitialized(1L, TimeUnit.MICROSECONDS)) {
                                log.warn("cache is already initialized. ignoring [%s] event, nodeType [%s].", event.getType(), this.nodeType);
                                return;
                            }
                            log.info("Received INITIALIZED in node watcher for type [%s].", this.nodeType);
                            ImmutableList<DiscoveryDruidNode> currNodes = ImmutableList.copyOf(this.nodes.values());
                            for (DruidNodeDiscovery.Listener l : this.nodeListeners) {
                                this.safeSchedule(() -> l.nodesAdded(currNodes), "Exception occured in nodesAdded([%s]) in listener [%s].", currNodes, l);
                            }
                            this.cacheInitialized.countDown();
                            break;
                        }
                        default: {
                            log.info("Ignored event type [%s] for nodeType [%s] watcher.", event.getType(), this.nodeType);
                            break;
                        }
                    }
                }
                catch (Exception ex) {
                    log.error(ex, "unknown error in node watcher for type [%s].", this.nodeType);
                }
            }
        }

        private boolean isCacheInitialized(long waitFor, TimeUnit timeUnit) {
            try {
                return this.cacheInitialized.await(waitFor, timeUnit);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return false;
            }
        }

        private void safeSchedule(Runnable runnable, String errMsgFormat, Object ... args) {
            this.listenerExecutor.submit(() -> {
                try {
                    runnable.run();
                }
                catch (Exception ex) {
                    log.error(errMsgFormat, args);
                }
            });
        }

        private void addNode(DiscoveryDruidNode druidNode) {
            DiscoveryDruidNode prev = this.nodes.putIfAbsent(druidNode.getDruidNode().getHostAndPortToUse(), druidNode);
            if (prev == null) {
                if (this.isCacheInitialized(1L, TimeUnit.MICROSECONDS)) {
                    ImmutableList<DiscoveryDruidNode> newNode = ImmutableList.of(druidNode);
                    for (DruidNodeDiscovery.Listener l : this.nodeListeners) {
                        this.safeSchedule(() -> l.nodesAdded(newNode), "Exception occured in nodeAdded(node=[%s]) in listener [%s].", druidNode, l);
                    }
                }
            } else {
                log.warn("Node[%s] discovered but existed already [%s].", druidNode, prev);
            }
        }

        private void removeNode(DiscoveryDruidNode druidNode) {
            DiscoveryDruidNode prev = this.nodes.remove(druidNode.getDruidNode().getHostAndPortToUse());
            if (prev == null) {
                log.warn("Noticed disappearance of unknown druid node [%s:%s].", druidNode.getNodeType(), druidNode);
                return;
            }
            if (this.isCacheInitialized(1L, TimeUnit.MICROSECONDS)) {
                ImmutableList<DiscoveryDruidNode> nodeRemoved = ImmutableList.of(druidNode);
                for (DruidNodeDiscovery.Listener l : this.nodeListeners) {
                    this.safeSchedule(() -> l.nodesRemoved(nodeRemoved), "Exception occured in nodeRemoved(node=[%s]) in listener [%s].", druidNode, l);
                }
            }
        }

        public void start() {
            try {
                this.cache.getListenable().addListener((client, event) -> this.handleChildEvent(client, event));
                this.cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            }
            catch (Exception ex) {
                throw Throwables.propagate(ex);
            }
        }

        public void stop() {
            try {
                this.cache.close();
                this.cacheExecutor.shutdownNow();
            }
            catch (Exception ex) {
                log.error(ex, "Failed to stop node watcher for type [%s].", this.nodeType);
            }
        }
    }
}

