package oadd.org.apache.drill.exec.rpc.user.clusterclient.zkbased;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import oadd.com.google.common.base.Function;
import oadd.com.google.common.collect.Collections2;
import oadd.com.google.common.collect.Lists;
import oadd.org.apache.curator.framework.CuratorFramework;
import oadd.org.apache.curator.framework.CuratorFrameworkFactory;
import oadd.org.apache.curator.framework.state.ConnectionState;
import oadd.org.apache.curator.framework.state.ConnectionStateListener;
import oadd.org.apache.curator.retry.RetryNTimes;
import oadd.org.apache.curator.x.discovery.ServiceCache;
import oadd.org.apache.curator.x.discovery.ServiceDiscovery;
import oadd.org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import oadd.org.apache.curator.x.discovery.ServiceInstance;
import oadd.org.apache.curator.x.discovery.details.ServiceCacheListener;
import oadd.org.apache.drill.common.AutoCloseables;
import oadd.org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import oadd.org.apache.drill.exec.proto.CoordinationProtos;
import oadd.org.apache.drill.exec.rpc.user.clusterclient.EndpointProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:oadd/org/apache/drill/exec/rpc/user/clusterclient/zkbased/ZKBasedEndpointProvider.class */
public class ZKBasedEndpointProvider implements EndpointProvider {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ZKBasedEndpointProvider.class);
    private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$");
    private final CuratorFramework curator;
    private final ServiceDiscovery<CoordinationProtos.DrillbitEndpoint> discovery;
    private final ServiceCache<CoordinationProtos.DrillbitEndpoint> serviceCache;
    private final Random random = new Random();
    private volatile List<CoordinationProtos.DrillbitEndpoint> endpoints;

    /* loaded from: input_file:oadd/org/apache/drill/exec/rpc/user/clusterclient/zkbased/ZKBasedEndpointProvider$Builder.class */
    public static class Builder {
        int retryTimes = 5;
        int retryDelay = 500;
        int timeoutMs = 5000;
        String connectionString;
        String zkRoot;
        String clusterId;

        public Builder withUrl(String str) {
            Matcher matcher = ZKBasedEndpointProvider.ZK_COMPLEX_STRING.matcher(str);
            if (matcher.matches()) {
                this.connectionString = matcher.group(1);
                this.zkRoot = matcher.group(2);
                this.clusterId = matcher.group(3);
            }
            return this;
        }

        public Builder withRetryTimes(int i) {
            this.retryTimes = i;
            return this;
        }

        public Builder withRetryDelay(int i) {
            this.retryDelay = i;
            return this;
        }

        public Builder withTimeoutMs(int i) {
            this.timeoutMs = i;
            return this;
        }

        public Builder withConnectionString(String str) {
            this.connectionString = str;
            return this;
        }

        public Builder withZKRoot(String str) {
            this.zkRoot = str;
            return this;
        }

        public Builder withClusterId(String str) {
            this.clusterId = str;
            return this;
        }

        public ZKBasedEndpointProvider build() throws Exception {
            if (this.connectionString == null || this.zkRoot == null || this.clusterId == null) {
                throw new IllegalArgumentException();
            }
            return new ZKBasedEndpointProvider(this);
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    ZKBasedEndpointProvider(final Builder builder) throws Exception {
        this.curator = CuratorFrameworkFactory.builder().namespace(builder.zkRoot).connectionTimeoutMs(builder.timeoutMs).retryPolicy(new RetryNTimes(builder.retryTimes, builder.retryDelay)).connectString(builder.connectionString).build();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.curator.getConnectionStateListenable().addListener(new ConnectionStateListener() { // from class: oadd.org.apache.drill.exec.rpc.user.clusterclient.zkbased.ZKBasedEndpointProvider.1
            @Override // oadd.org.apache.curator.framework.state.ConnectionStateListener
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                if (connectionState == ConnectionState.CONNECTED) {
                    countDownLatch.countDown();
                    curatorFramework.getConnectionStateListenable().removeListener(this);
                }
            }
        });
        this.curator.start();
        this.discovery = ServiceDiscoveryBuilder.builder(CoordinationProtos.DrillbitEndpoint.class).basePath("/").client(this.curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build();
        this.discovery.start();
        countDownLatch.await();
        this.serviceCache = this.discovery.serviceCacheBuilder().name(builder.clusterId).build();
        this.serviceCache.addListener(new ServiceCacheListener() { // from class: oadd.org.apache.drill.exec.rpc.user.clusterclient.zkbased.ZKBasedEndpointProvider.2
            final String clusterId;

            {
                this.clusterId = builder.clusterId;
            }

            @Override // oadd.org.apache.curator.x.discovery.details.ServiceCacheListener
            public void cacheChanged() {
                ZKBasedEndpointProvider.this.updateEndpoints(this.clusterId);
            }

            @Override // oadd.org.apache.curator.framework.state.ConnectionStateListener
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            }
        });
        this.serviceCache.start();
        updateEndpoints(builder.clusterId);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateEndpoints(String str) {
        try {
            ArrayList newArrayList = Lists.newArrayList(Collections2.transform(this.discovery.queryForInstances(str), new Function<ServiceInstance<CoordinationProtos.DrillbitEndpoint>, CoordinationProtos.DrillbitEndpoint>() { // from class: oadd.org.apache.drill.exec.rpc.user.clusterclient.zkbased.ZKBasedEndpointProvider.3
                @Override // oadd.com.google.common.base.Function, java.util.function.Function
                public CoordinationProtos.DrillbitEndpoint apply(ServiceInstance<CoordinationProtos.DrillbitEndpoint> serviceInstance) {
                    return serviceInstance.getPayload();
                }
            }));
            synchronized (this) {
                this.endpoints = newArrayList;
            }
        } catch (Exception e) {
            logger.error("Unexpected failure while requesting ZooKeeper for list of bits.", (Throwable) e);
        }
    }

    @Override // oadd.org.apache.drill.exec.rpc.user.clusterclient.EndpointProvider
    public CoordinationProtos.DrillbitEndpoint getEndpoint() {
        synchronized (this) {
            if (this.endpoints.size() == 0) {
                return null;
            }
            return this.endpoints.get(this.random.nextInt(this.endpoints.size()));
        }
    }

    @Override // oadd.org.apache.drill.exec.rpc.user.clusterclient.EndpointProvider
    public void close() {
        try {
            AutoCloseables.close(this.serviceCache, this.discovery, this.curator);
        } catch (Exception e) {
            logger.warn("Ignoring exception(s) while closing ZK resources", (Throwable) e);
        }
    }
}
