/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.drill.exec.rpc.user.clusterclient.zkbased;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import oadd.org.apache.curator.framework.CuratorFramework;
import oadd.org.apache.curator.framework.CuratorFrameworkFactory;
import oadd.org.apache.curator.framework.state.ConnectionState;
import oadd.org.apache.curator.framework.state.ConnectionStateListener;
import oadd.org.apache.curator.retry.RetryNTimes;
import oadd.org.apache.curator.x.discovery.ServiceCache;
import oadd.org.apache.curator.x.discovery.ServiceDiscovery;
import oadd.org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import oadd.org.apache.curator.x.discovery.ServiceInstance;
import oadd.org.apache.curator.x.discovery.details.ServiceCacheListener;
import oadd.org.apache.drill.common.AutoCloseables;
import oadd.org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import oadd.org.apache.drill.exec.proto.CoordinationProtos;
import oadd.org.apache.drill.exec.rpc.user.clusterclient.EndpointProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.collect.Collections2;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKBasedEndpointProvider
implements EndpointProvider {
    private static final Logger logger = LoggerFactory.getLogger(ZKBasedEndpointProvider.class);
    private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
    private final CuratorFramework curator;
    private final ServiceDiscovery<CoordinationProtos.DrillbitEndpoint> discovery;
    private final ServiceCache<CoordinationProtos.DrillbitEndpoint> serviceCache;
    private final Random random = new Random();
    private volatile List<CoordinationProtos.DrillbitEndpoint> endpoints;

    public static Builder newBuilder() {
        return new Builder();
    }

    ZKBasedEndpointProvider(final Builder builder) throws Exception {
        this.curator = CuratorFrameworkFactory.builder().namespace(builder.zkRoot).connectionTimeoutMs(builder.timeoutMs).retryPolicy(new RetryNTimes(builder.retryTimes, builder.retryDelay)).connectString(builder.connectionString).build();
        final CountDownLatch latch = new CountDownLatch(1);
        this.curator.getConnectionStateListenable().addListener(new ConnectionStateListener(){

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                if (newState == ConnectionState.CONNECTED) {
                    latch.countDown();
                    client.getConnectionStateListenable().removeListener(this);
                }
            }
        });
        this.curator.start();
        this.discovery = ServiceDiscoveryBuilder.builder(CoordinationProtos.DrillbitEndpoint.class).basePath("/").client(this.curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build();
        this.discovery.start();
        latch.await();
        this.serviceCache = this.discovery.serviceCacheBuilder().name(builder.clusterId).build();
        this.serviceCache.addListener((CoordinationProtos.DrillbitEndpoint)((Object)new ServiceCacheListener(){
            final String clusterId;
            {
                this.clusterId = builder.clusterId;
            }

            @Override
            public void cacheChanged() {
                ZKBasedEndpointProvider.this.updateEndpoints(this.clusterId);
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
            }
        }));
        this.serviceCache.start();
        this.updateEndpoints(builder.clusterId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateEndpoints(String clusterId) {
        try {
            ArrayList<CoordinationProtos.DrillbitEndpoint> list = Lists.newArrayList(Collections2.transform(this.discovery.queryForInstances(clusterId), new Function<ServiceInstance<CoordinationProtos.DrillbitEndpoint>, CoordinationProtos.DrillbitEndpoint>(){

                @Override
                public CoordinationProtos.DrillbitEndpoint apply(ServiceInstance<CoordinationProtos.DrillbitEndpoint> input) {
                    return input.getPayload();
                }
            }));
            ZKBasedEndpointProvider zKBasedEndpointProvider = this;
            synchronized (zKBasedEndpointProvider) {
                this.endpoints = list;
            }
        }
        catch (Exception e) {
            logger.error("Unexpected failure while requesting ZooKeeper for list of bits.", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CoordinationProtos.DrillbitEndpoint getEndpoint() {
        ZKBasedEndpointProvider zKBasedEndpointProvider = this;
        synchronized (zKBasedEndpointProvider) {
            if (this.endpoints.size() == 0) {
                return null;
            }
            return this.endpoints.get(this.random.nextInt(this.endpoints.size()));
        }
    }

    @Override
    public void close() {
        try {
            AutoCloseables.close(this.serviceCache, this.discovery, this.curator);
        }
        catch (Exception e) {
            logger.warn("Ignoring exception(s) while closing ZK resources", e);
        }
    }

    public static class Builder {
        int retryTimes = 5;
        int retryDelay = 500;
        int timeoutMs = 5000;
        String connectionString;
        String zkRoot;
        String clusterId;

        public Builder withUrl(String url) {
            Matcher matcher = ZK_COMPLEX_STRING.matcher(url);
            if (matcher.matches()) {
                this.connectionString = matcher.group(1);
                this.zkRoot = matcher.group(2);
                this.clusterId = matcher.group(3);
            }
            return this;
        }

        public Builder withRetryTimes(int retryTimes) {
            this.retryTimes = retryTimes;
            return this;
        }

        public Builder withRetryDelay(int retryDelay) {
            this.retryDelay = retryDelay;
            return this;
        }

        public Builder withTimeoutMs(int timeoutMs) {
            this.timeoutMs = timeoutMs;
            return this;
        }

        public Builder withConnectionString(String connectionString) {
            this.connectionString = connectionString;
            return this;
        }

        public Builder withZKRoot(String zkRoot) {
            this.zkRoot = zkRoot;
            return this;
        }

        public Builder withClusterId(String clusterId) {
            this.clusterId = clusterId;
            return this;
        }

        public ZKBasedEndpointProvider build() throws Exception {
            if (this.connectionString == null || this.zkRoot == null || this.clusterId == null) {
                throw new IllegalArgumentException();
            }
            return new ZKBasedEndpointProvider(this);
        }
    }
}

