package org.apache.drill.exec.work.foreman.rm;

import java.util.concurrent.TimeUnit;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.work.foreman.rm.QueryQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.class */
public class DistributedQueryQueue implements QueryQueue {
    private static final Logger logger = LoggerFactory.getLogger(DistributedQueryQueue.class);
    private long memoryPerNode;
    private SystemOptionManager optionManager;
    private ConfigSet configSet;
    private ClusterCoordinator clusterCoordinator;
    private long nextRefreshTime;
    private long memoryPerSmallQuery;
    private long memoryPerLargeQuery;
    private final StatusAdapter statusAdapter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue$ConfigSet.class */
    public static class ConfigSet {
        private final long queueThreshold;
        private final int queueTimeout;
        private final int largeQueueSize;
        private final int smallQueueSize;
        private final double largeToSmallRatio;
        private final double reserveMemoryRatio;
        private final long minimumOperatorMemory;

        public ConfigSet(SystemOptionManager systemOptionManager) {
            this.queueThreshold = systemOptionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
            this.queueTimeout = (int) systemOptionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
            this.largeQueueSize = (int) systemOptionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
            this.smallQueueSize = (int) systemOptionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
            this.largeToSmallRatio = systemOptionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO);
            this.reserveMemoryRatio = systemOptionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE);
            this.minimumOperatorMemory = systemOptionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
        }

        public boolean isSameAs(ConfigSet configSet) {
            return configSet != null && this.queueThreshold == configSet.queueThreshold && this.queueTimeout == configSet.queueTimeout && this.largeQueueSize == configSet.largeQueueSize && this.smallQueueSize == configSet.smallQueueSize && this.largeToSmallRatio == configSet.largeToSmallRatio && this.reserveMemoryRatio == configSet.reserveMemoryRatio && this.minimumOperatorMemory == configSet.minimumOperatorMemory;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue$DistributedQueueLease.class */
    public class DistributedQueueLease implements QueryQueue.QueueLease {
        private final UserBitShared.QueryId queryId;
        private DistributedSemaphore.DistributedLease lease;
        private final String queueName;
        private long queryMemory;

        public DistributedQueueLease(UserBitShared.QueryId queryId, String str, DistributedSemaphore.DistributedLease distributedLease, long j) {
            this.queryId = queryId;
            this.queueName = str;
            this.lease = distributedLease;
            this.queryMemory = j;
        }

        public String toString() {
            return String.format("Lease for %s queue to query %s", this.queueName, QueryIdHelper.getQueryId(this.queryId));
        }

        @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease
        public long queryMemoryPerNode() {
            return this.queryMemory;
        }

        @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease
        public void release() {
            DistributedQueryQueue.this.release(this);
        }

        @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease
        public String queueName() {
            return this.queueName;
        }
    }

    /* loaded from: input_file:org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue$StatusAdapter.class */
    public interface StatusAdapter {
        boolean inShutDown();
    }

    @XmlRootElement
    /* loaded from: input_file:org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue$ZKQueueInfo.class */
    public static class ZKQueueInfo {
        public final int smallQueueSize;
        public final int largeQueueSize;
        public final double queueThreshold;
        public final long memoryPerNode;
        public final long memoryPerSmallQuery;
        public final long memoryPerLargeQuery;

        public ZKQueueInfo(DistributedQueryQueue distributedQueryQueue) {
            this.smallQueueSize = distributedQueryQueue.configSet.smallQueueSize;
            this.largeQueueSize = distributedQueryQueue.configSet.largeQueueSize;
            this.queueThreshold = distributedQueryQueue.configSet.queueThreshold;
            this.memoryPerNode = distributedQueryQueue.memoryPerNode;
            this.memoryPerSmallQuery = distributedQueryQueue.memoryPerSmallQuery;
            this.memoryPerLargeQuery = distributedQueryQueue.memoryPerLargeQuery;
        }
    }

    public DistributedQueryQueue(DrillbitContext drillbitContext, StatusAdapter statusAdapter) {
        this.statusAdapter = statusAdapter;
        this.optionManager = drillbitContext.getOptionManager();
        this.clusterCoordinator = drillbitContext.getClusterCoordinator();
    }

    @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue
    public void setMemoryPerNode(long j) {
        this.memoryPerNode = j;
        refreshConfig();
    }

    @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue
    public long defaultQueryMemoryPerNode(double d) {
        return d < ((double) this.configSet.queueThreshold) ? this.memoryPerSmallQuery : this.memoryPerLargeQuery;
    }

    @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue
    public long minimumOperatorMemory() {
        return this.configSet.minimumOperatorMemory;
    }

    @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue
    public QueryQueue.QueueLease enqueue(UserBitShared.QueryId queryId, double d) throws QueryQueue.QueryQueueException, QueryQueue.QueueTimeoutException {
        DistributedSemaphore semaphore;
        String str;
        long j;
        try {
            synchronized (this) {
                refreshConfig();
                if (d >= this.configSet.queueThreshold) {
                    semaphore = this.clusterCoordinator.getSemaphore("query.large", this.configSet.largeQueueSize);
                    str = "large";
                    j = this.memoryPerLargeQuery;
                } else {
                    semaphore = this.clusterCoordinator.getSemaphore("query.small", this.configSet.smallQueueSize);
                    str = "small";
                    j = this.memoryPerSmallQuery;
                }
            }
            logger.debug("Query {} with cost {} placed into the {} queue.", new Object[]{QueryIdHelper.getQueryId(queryId), Double.valueOf(d), str});
            DistributedSemaphore.DistributedLease acquire = semaphore.acquire(this.configSet.queueTimeout, TimeUnit.MILLISECONDS);
            if (acquire != null) {
                return new DistributedQueueLease(queryId, str, acquire, j);
            }
            logger.warn("Queue timeout: {} after {} ms. ({} seconds)", new Object[]{str, String.format("%,d", Integer.valueOf(this.configSet.queueTimeout)), Integer.valueOf((int) Math.round(this.configSet.queueTimeout / 1000.0d))});
            throw new QueryQueue.QueueTimeoutException(queryId, str, this.configSet.queueTimeout);
        } catch (Exception e) {
            logger.error("Unable to acquire slot for query " + QueryIdHelper.getQueryId(queryId), e);
            throw new QueryQueue.QueryQueueException("Unable to acquire slot for query.", e);
        }
    }

    private synchronized void refreshConfig() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis < this.nextRefreshTime) {
            return;
        }
        this.nextRefreshTime = currentTimeMillis + 5000;
        ConfigSet configSet = new ConfigSet(this.optionManager);
        if (configSet.isSameAs(this.configSet)) {
            return;
        }
        this.configSet = configSet;
        double d = (this.configSet.largeToSmallRatio * this.configSet.largeQueueSize) + this.configSet.smallQueueSize;
        double round = Math.round(this.memoryPerNode * (1.0d - this.configSet.reserveMemoryRatio));
        double d2 = round / d;
        this.memoryPerLargeQuery = Math.round(d2 * this.configSet.largeToSmallRatio);
        this.memoryPerSmallQuery = Math.round(d2);
        logger.debug("Memory config: total memory per node = {}, available: {},  large/small memory ratio = {}", new Object[]{Long.valueOf(this.memoryPerNode), Double.valueOf(round), Double.valueOf(this.configSet.largeToSmallRatio)});
        logger.debug("Reserve memory ratio: {}, bytes: {}", Double.valueOf(this.configSet.reserveMemoryRatio), Double.valueOf(this.memoryPerNode - round));
        logger.debug("Minimum operator memory: {}", Long.valueOf(this.configSet.minimumOperatorMemory));
        logger.debug("Small queue: {} slots, {} bytes per slot", Integer.valueOf(this.configSet.smallQueueSize), Long.valueOf(this.memoryPerSmallQuery));
        logger.debug("Large queue: {} slots, {} bytes per slot", Integer.valueOf(this.configSet.largeQueueSize), Long.valueOf(this.memoryPerLargeQuery));
        logger.debug("Cost threshold: {}, timeout: {} ms.", Long.valueOf(this.configSet.queueThreshold), Integer.valueOf(this.configSet.queueTimeout));
    }

    @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue
    public boolean enabled() {
        return true;
    }

    public synchronized ZKQueueInfo getInfo() {
        refreshConfig();
        return new ZKQueueInfo(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void release(QueryQueue.QueueLease queueLease) {
        DistributedQueueLease distributedQueueLease = (DistributedQueueLease) queueLease;
        while (true) {
            try {
                distributedQueueLease.lease.close();
                distributedQueueLease.lease = null;
                return;
            } catch (InterruptedException e) {
                if (inShutdown()) {
                    logger.warn("In shutdown mode: abandoning attempt to release lease");
                }
            } catch (Exception e2) {
                logger.warn("Failure while releasing lease.", e2);
                return;
            }
        }
    }

    private boolean inShutdown() {
        if (this.statusAdapter == null) {
            return false;
        }
        return this.statusAdapter.inShutDown();
    }

    @Override // org.apache.drill.exec.work.foreman.rm.QueryQueue
    public void close() {
    }
}
