/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.drill.exec.coord.zk;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import oadd.com.google.common.base.Function;
import oadd.com.google.common.base.Throwables;
import oadd.com.google.common.collect.Collections2;
import oadd.org.apache.commons.collections.keyvalue.MultiKey;
import oadd.org.apache.curator.framework.CuratorFramework;
import oadd.org.apache.curator.framework.CuratorFrameworkFactory;
import oadd.org.apache.curator.framework.api.ACLProvider;
import oadd.org.apache.curator.framework.imps.DefaultACLProvider;
import oadd.org.apache.curator.framework.state.ConnectionState;
import oadd.org.apache.curator.framework.state.ConnectionStateListener;
import oadd.org.apache.curator.retry.RetryNTimes;
import oadd.org.apache.curator.x.discovery.ServiceCache;
import oadd.org.apache.curator.x.discovery.ServiceDiscovery;
import oadd.org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import oadd.org.apache.curator.x.discovery.ServiceInstance;
import oadd.org.apache.curator.x.discovery.details.ServiceCacheListener;
import oadd.org.apache.drill.common.AutoCloseables;
import oadd.org.apache.drill.common.config.DrillConfig;
import oadd.org.apache.drill.exec.coord.ClusterCoordinator;
import oadd.org.apache.drill.exec.coord.DistributedSemaphore;
import oadd.org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import oadd.org.apache.drill.exec.coord.store.CachingTransientStoreFactory;
import oadd.org.apache.drill.exec.coord.store.TransientStore;
import oadd.org.apache.drill.exec.coord.store.TransientStoreConfig;
import oadd.org.apache.drill.exec.coord.store.TransientStoreFactory;
import oadd.org.apache.drill.exec.coord.zk.ZKRegistrationHandle;
import oadd.org.apache.drill.exec.coord.zk.ZkDistributedSemaphore;
import oadd.org.apache.drill.exec.coord.zk.ZkEphemeralStore;
import oadd.org.apache.drill.exec.coord.zk.ZkTransientStoreFactory;
import oadd.org.apache.drill.exec.proto.CoordinationProtos;
import oadd.org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKClusterCoordinator
extends ClusterCoordinator {
    static final Logger logger = LoggerFactory.getLogger(ZKClusterCoordinator.class);
    private CuratorFramework curator;
    private ServiceDiscovery<CoordinationProtos.DrillbitEndpoint> discovery;
    private volatile Collection<CoordinationProtos.DrillbitEndpoint> endpoints = Collections.emptyList();
    private final String serviceName;
    private final CountDownLatch initialConnection = new CountDownLatch(1);
    private final TransientStoreFactory factory;
    private ServiceCache<CoordinationProtos.DrillbitEndpoint> serviceCache;
    private CoordinationProtos.DrillbitEndpoint endpoint;
    private ConcurrentHashMap<MultiKey, CoordinationProtos.DrillbitEndpoint> endpointsMap = new ConcurrentHashMap();
    private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");

    public ZKClusterCoordinator(DrillConfig config, String connect) throws Exception {
        this(config, connect, false, new DefaultACLProvider());
    }

    public ZKClusterCoordinator(DrillConfig config, ACLProvider aclProvider) throws Exception {
        this(config, null, true, aclProvider);
    }

    public ZKClusterCoordinator(DrillConfig config, String connect, boolean createNamespace, ACLProvider aclProvider) throws Exception {
        connect = connect == null || connect.isEmpty() ? config.getString("drill.exec.zk.connect") : connect;
        String clusterId = config.getString("drill.exec.cluster-id");
        String zkRoot = config.getString("drill.exec.zk.root");
        Matcher m4 = ZK_COMPLEX_STRING.matcher(connect);
        if (m4.matches()) {
            connect = m4.group(1);
            zkRoot = m4.group(2);
            clusterId = m4.group(3);
        }
        logger.debug("Connect {}, zkRoot {}, clusterId: " + clusterId, (Object)connect, (Object)zkRoot);
        this.serviceName = clusterId;
        RetryNTimes rp = new RetryNTimes(config.getInt("drill.exec.zk.retry.count"), config.getInt("drill.exec.zk.retry.delay"));
        CuratorFrameworkFactory.Builder curatorBuilder = CuratorFrameworkFactory.builder().connectionTimeoutMs(config.getInt("drill.exec.zk.timeout")).retryPolicy(rp).connectString(connect).aclProvider(aclProvider);
        if (!createNamespace) {
            try (CuratorFramework client = curatorBuilder.build();){
                client.start();
                Stat stat = (Stat)client.checkExists().forPath("/" + zkRoot);
                Objects.requireNonNull(stat, "The root path does not exist in the Zookeeper.");
            }
        }
        this.curator = curatorBuilder.namespace(zkRoot).build();
        this.curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
        this.curator.start();
        this.discovery = this.newDiscovery();
        this.factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(this.curator));
    }

    public CuratorFramework getCurator() {
        return this.curator;
    }

    @Override
    public void start(long millisToWait) throws Exception {
        logger.debug("Starting ZKClusterCoordination.");
        this.discovery.start();
        if (millisToWait != 0L) {
            boolean success = this.initialConnection.await(millisToWait, TimeUnit.MILLISECONDS);
            if (!success) {
                throw new IOException(String.format("Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.", millisToWait));
            }
        } else {
            this.initialConnection.await();
        }
        this.serviceCache = this.discovery.serviceCacheBuilder().name(this.serviceName).build();
        this.serviceCache.addListener((CoordinationProtos.DrillbitEndpoint)((Object)new EndpointListener()));
        this.serviceCache.start();
        this.updateEndpoints();
    }

    @Override
    public void close() throws Exception {
        AutoCloseables.close(this.serviceCache, this.discovery, this.factory, this.curator);
    }

    @Override
    public ClusterCoordinator.RegistrationHandle register(CoordinationProtos.DrillbitEndpoint data) {
        try {
            data = data.toBuilder().setState(CoordinationProtos.DrillbitEndpoint.State.ONLINE).build();
            ServiceInstance<CoordinationProtos.DrillbitEndpoint> serviceInstance = this.newServiceInstance(data);
            this.discovery.registerService(serviceInstance);
            return new ZKRegistrationHandle(serviceInstance.getId(), data);
        }
        catch (Exception e) {
            Throwables.throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public void unregister(ClusterCoordinator.RegistrationHandle handle) {
        if (!(handle instanceof ZKRegistrationHandle)) {
            throw new UnsupportedOperationException("Unknown handle type: " + handle.getClass().getName());
        }
        this.listeners.clear();
        ZKRegistrationHandle h2 = (ZKRegistrationHandle)handle;
        try {
            ServiceInstance serviceInstance = ServiceInstance.builder().address("").port(0).id(h2.id).name(this.serviceName).build();
            this.discovery.unregisterService(serviceInstance);
        }
        catch (Exception e) {
            Throwables.throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
    }

    @Override
    public ClusterCoordinator.RegistrationHandle update(ClusterCoordinator.RegistrationHandle handle, CoordinationProtos.DrillbitEndpoint.State state) {
        ZKRegistrationHandle h2 = (ZKRegistrationHandle)handle;
        try {
            this.endpoint = h2.endpoint.toBuilder().setState(state).build();
            ServiceInstance<CoordinationProtos.DrillbitEndpoint> serviceInstance = ServiceInstance.builder().name(this.serviceName).id(h2.id).payload(this.endpoint).build();
            this.discovery.updateService(serviceInstance);
        }
        catch (Exception e) {
            Throwables.throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
        return handle;
    }

    @Override
    public Collection<CoordinationProtos.DrillbitEndpoint> getAvailableEndpoints() {
        return this.endpoints;
    }

    @Override
    public Collection<CoordinationProtos.DrillbitEndpoint> getOnlineEndPoints() {
        ArrayList<CoordinationProtos.DrillbitEndpoint> runningEndPoints = new ArrayList<CoordinationProtos.DrillbitEndpoint>();
        for (CoordinationProtos.DrillbitEndpoint endpoint : this.endpoints) {
            if (!this.isDrillbitInState(endpoint, CoordinationProtos.DrillbitEndpoint.State.ONLINE)) continue;
            runningEndPoints.add(endpoint);
        }
        logger.debug("Online endpoints in ZK are" + ((Object)runningEndPoints).toString());
        return runningEndPoints;
    }

    @Override
    public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
        return new ZkDistributedSemaphore(this.curator, "/semaphore/" + name, maximumLeases);
    }

    @Override
    public <V> TransientStore<V> getOrCreateTransientStore(TransientStoreConfig<V> config) {
        ZkEphemeralStore store = (ZkEphemeralStore)this.factory.getOrCreateStore(config);
        return store;
    }

    private synchronized void updateEndpoints() {
        try {
            Collection<CoordinationProtos.DrillbitEndpoint> newDrillbitSet = Collections2.transform(this.discovery.queryForInstances(this.serviceName), new Function<ServiceInstance<CoordinationProtos.DrillbitEndpoint>, CoordinationProtos.DrillbitEndpoint>(){

                @Override
                public CoordinationProtos.DrillbitEndpoint apply(ServiceInstance<CoordinationProtos.DrillbitEndpoint> input) {
                    return input.getPayload();
                }
            });
            HashSet<CoordinationProtos.DrillbitEndpoint> unregisteredBits = new HashSet<CoordinationProtos.DrillbitEndpoint>();
            HashSet<CoordinationProtos.DrillbitEndpoint> registeredBits = new HashSet<CoordinationProtos.DrillbitEndpoint>();
            for (CoordinationProtos.DrillbitEndpoint endpoint : newDrillbitSet) {
                int endpointPort;
                String endpointAddress = endpoint.getAddress();
                if (!this.endpointsMap.containsKey(new MultiKey(endpointAddress, endpointPort = endpoint.getUserPort()))) {
                    registeredBits.add(endpoint);
                }
                this.endpointsMap.put(new MultiKey(endpointAddress, endpointPort), endpoint);
            }
            for (MultiKey key : this.endpointsMap.keySet()) {
                if (newDrillbitSet.contains(this.endpointsMap.get(key))) continue;
                unregisteredBits.add(this.endpointsMap.get(key));
                this.endpointsMap.remove(key);
            }
            this.endpoints = this.endpointsMap.values();
            if (logger.isDebugEnabled()) {
                StringBuilder builder = new StringBuilder();
                builder.append("Active drillbit set changed.  Now includes ");
                builder.append(newDrillbitSet.size());
                builder.append(" total bits. New active drillbits:\n");
                builder.append("Address | User Port | Control Port | Data Port | Version | State\n");
                for (CoordinationProtos.DrillbitEndpoint bit : newDrillbitSet) {
                    builder.append(bit.getAddress()).append(" | ");
                    builder.append(bit.getUserPort()).append(" | ");
                    builder.append(bit.getControlPort()).append(" | ");
                    builder.append(bit.getDataPort()).append(" | ");
                    builder.append(bit.getVersion()).append(" |");
                    builder.append(bit.getState()).append(" | ");
                    builder.append('\n');
                }
                logger.debug(builder.toString());
            }
            if (!unregisteredBits.isEmpty()) {
                this.drillbitUnregistered(unregisteredBits);
            }
            if (!registeredBits.isEmpty()) {
                this.drillbitRegistered(registeredBits);
            }
        }
        catch (Exception e) {
            logger.error("Failure while update Drillbit service location cache.", e);
        }
    }

    protected ServiceInstance<CoordinationProtos.DrillbitEndpoint> newServiceInstance(CoordinationProtos.DrillbitEndpoint endpoint) throws Exception {
        return ServiceInstance.builder().name(this.serviceName).payload(endpoint).build();
    }

    protected ServiceDiscovery<CoordinationProtos.DrillbitEndpoint> newDiscovery() {
        return ServiceDiscoveryBuilder.builder(CoordinationProtos.DrillbitEndpoint.class).basePath("/").client(this.curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build();
    }

    private class EndpointListener
    implements ServiceCacheListener {
        private EndpointListener() {
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
        }

        @Override
        public void cacheChanged() {
            logger.debug("Got cache changed --> updating endpoints");
            ZKClusterCoordinator.this.updateEndpoints();
        }
    }

    private class InitialConnectionListener
    implements ConnectionStateListener {
        private InitialConnectionListener() {
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED) {
                ZKClusterCoordinator.this.initialConnection.countDown();
                client.getConnectionStateListenable().removeListener(this);
            }
        }
    }
}

