package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DiskBasedDominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DiskBasedResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

@InterfaceAudience.LimitedPrivate({"yarn"})
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.class */
public class FairScheduler extends AbstractYarnScheduler<FSAppAttempt, FSSchedulerNode> {
    private FairSchedulerConfiguration conf;
    private Resource incrAllocation;
    private QueueManager queueMgr;
    private volatile Clock clock;
    private boolean usePortForNodeName;
    private static final Log LOG = LogFactory.getLog(FairScheduler.class);
    private static final ResourceCalculator RESOURCE_CALCULATOR = new DiskBasedResourceCalculator();
    private static final ResourceCalculator DOMINANT_RESOURCE_CALCULATOR = new DiskBasedDominantResourceCalculator();
    public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
    protected long updateInterval;
    private final int UPDATE_DEBUG_FREQUENCY = 5;
    private int updatesToSkipForDebug;

    @VisibleForTesting
    Thread updateThread;

    @VisibleForTesting
    Thread schedulingThread;
    protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
    FSQueueMetrics rootMetrics;
    FSOpDurations fsOpDurations;
    protected long lastPreemptionUpdateTime;
    private long lastPreemptCheckTime;
    protected boolean preemptionEnabled;
    protected float preemptionUtilizationThreshold;
    protected long preemptionInterval;
    protected long waitTimeBeforeKill;
    private List<RMContainer> warnedContainers;
    private HashMap<RMContainer, FSAppAttempt> preemptMapping;
    protected boolean sizeBasedWeight;
    protected WeightAdjuster weightAdjuster;
    protected boolean continuousSchedulingEnabled;
    protected int continuousSchedulingSleepMs;
    private Comparator<NodeId> nodeAvailableResourceComparator;
    protected double nodeLocalityThreshold;
    protected double rackLocalityThreshold;
    protected long nodeLocalityDelayMs;
    protected long rackLocalityDelayMs;
    private FairSchedulerEventLog eventLog;
    protected boolean assignMultiple;
    protected int maxAssign;

    @VisibleForTesting
    final MaxRunningAppsEnforcer maxRunningEnforcer;
    private AllocationFileLoaderService allocsLoader;

    @VisibleForTesting
    AllocationConfiguration allocConf;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler$AllocationReloadListener.class */
    public class AllocationReloadListener implements AllocationFileLoaderService.Listener {
        private AllocationReloadListener() {
        }

        @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService.Listener
        public void onReload(AllocationConfiguration allocationConfiguration) {
            synchronized (FairScheduler.this) {
                FairScheduler.this.allocConf = allocationConfiguration;
                FairScheduler.this.allocConf.getDefaultSchedulingPolicy().initialize(FairScheduler.this.clusterResource);
                FairScheduler.this.queueMgr.updateAllocationConfiguration(FairScheduler.this.allocConf);
                FairScheduler.this.maxRunningEnforcer.updateRunnabilityOnReload();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler$ContinuousSchedulingThread.class */
    public class ContinuousSchedulingThread extends Thread {
        private ContinuousSchedulingThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    FairScheduler.this.continuousSchedulingAttempt();
                    Thread.sleep(FairScheduler.this.getContinuousSchedulingSleepMs());
                } catch (InterruptedException e) {
                    FairScheduler.LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler$NodeAvailableResourceComparator.class */
    private class NodeAvailableResourceComparator implements Comparator<NodeId> {
        private NodeAvailableResourceComparator() {
        }

        @Override // java.util.Comparator
        public int compare(NodeId nodeId, NodeId nodeId2) {
            if (!FairScheduler.this.nodes.containsKey(nodeId)) {
                return 1;
            }
            if (FairScheduler.this.nodes.containsKey(nodeId2)) {
                return FairScheduler.RESOURCE_CALCULATOR.compare(FairScheduler.this.clusterResource, ((FSSchedulerNode) FairScheduler.this.nodes.get(nodeId2)).getAvailableResource(), ((FSSchedulerNode) FairScheduler.this.nodes.get(nodeId)).getAvailableResource());
            }
            return -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler$UpdateThread.class */
    public class UpdateThread extends Thread {
        private UpdateThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(FairScheduler.this.updateInterval);
                    long time = FairScheduler.this.getClock().getTime();
                    FairScheduler.this.update();
                    FairScheduler.this.preemptTasksIfNecessary();
                    FairScheduler.this.fsOpDurations.addUpdateThreadRunDuration(FairScheduler.this.getClock().getTime() - time);
                } catch (InterruptedException e) {
                    FairScheduler.LOG.warn("Update thread interrupted. Exiting.");
                    return;
                } catch (Exception e2) {
                    FairScheduler.LOG.error("Exception in fair scheduler UpdateThread", e2);
                }
            }
        }
    }

    public FairScheduler() {
        super(FairScheduler.class.getName());
        this.UPDATE_DEBUG_FREQUENCY = 5;
        this.updatesToSkipForDebug = 5;
        this.THREAD_JOIN_TIMEOUT_MS = 1000L;
        this.warnedContainers = new ArrayList();
        this.preemptMapping = new HashMap<>();
        this.nodeAvailableResourceComparator = new NodeAvailableResourceComparator();
        this.clock = new SystemClock();
        this.allocsLoader = new AllocationFileLoaderService();
        this.queueMgr = new QueueManager(this);
        this.maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
    }

    private void validateConf(Configuration configuration) {
        int i = configuration.getInt("yarn.scheduler.minimum-allocation-mb", FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
        int i2 = configuration.getInt("yarn.scheduler.maximum-allocation-mb", 8192);
        if (i < 0 || i > i2) {
            throw new YarnRuntimeException("Invalid resource scheduler memory allocation configuration, yarn.scheduler.minimum-allocation-mb=" + i + ", yarn.scheduler.maximum-allocation-mb=" + i2 + ", min should equal greater than 0, max should be no smaller than min.");
        }
        int i3 = configuration.getInt("yarn.scheduler.minimum-allocation-vcores", 1);
        int i4 = configuration.getInt("yarn.scheduler.maximum-allocation-vcores", 4);
        if (i3 < 0 || i3 > i4) {
            throw new YarnRuntimeException("Invalid resource scheduler vcores allocation configuration, yarn.scheduler.minimum-allocation-vcores=" + i3 + ", yarn.scheduler.maximum-allocation-vcores=" + i4 + ", min should equal greater than 0, max should be no smaller than min.");
        }
        double d = configuration.getDouble("yarn.scheduler.minimum-allocation-disks", FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_DISKS);
        double d2 = configuration.getDouble("yarn.scheduler.maximum-allocation-disks", 4.0d);
        if (d < FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_DISKS || d > d2) {
            throw new YarnRuntimeException("Invalid resource scheduler disk allocation configuration, yarn.scheduler.minimum-allocation-disks=" + d + ", yarn.scheduler.maximum-allocation-disks=" + d2 + ", min should equal greater than 0, max should be no smaller than min.");
        }
    }

    public FairSchedulerConfiguration getConf() {
        return this.conf;
    }

    public QueueManager getQueueManager() {
        return this.queueMgr;
    }

    protected synchronized void update() {
        long time = getClock().getTime();
        updateStarvationStats();
        FSParentQueue rootQueue = this.queueMgr.getRootQueue();
        rootQueue.updateDemand();
        rootQueue.setFairShare(this.clusterResource);
        rootQueue.recomputeShares();
        updateRootQueueMetrics();
        if (LOG.isDebugEnabled()) {
            int i = this.updatesToSkipForDebug - 1;
            this.updatesToSkipForDebug = i;
            if (i < 0) {
                this.updatesToSkipForDebug = 5;
                LOG.debug("Cluster Capacity: " + this.clusterResource + "  Allocations: " + this.rootMetrics.getAllocatedResources() + "  Availability: " + Resource.newInstance(this.rootMetrics.getAvailableMB(), this.rootMetrics.getAvailableVirtualCores()) + "  Demand: " + rootQueue.getDemand());
            }
        }
        this.fsOpDurations.addUpdateCallDuration(getClock().getTime() - time);
    }

    private void updateStarvationStats() {
        this.lastPreemptionUpdateTime = this.clock.getTime();
        Iterator<FSLeafQueue> it = this.queueMgr.getLeafQueues().iterator();
        while (it.hasNext()) {
            it.next().updateStarvationStats();
        }
    }

    protected synchronized void preemptTasksIfNecessary() {
        if (shouldAttemptPreemption()) {
            long time = getClock().getTime();
            if (time - this.lastPreemptCheckTime < this.preemptionInterval) {
                return;
            }
            this.lastPreemptCheckTime = time;
            HashMap<FSAppAttempt, Resource> hashMap = new HashMap<>();
            Iterator<FSLeafQueue> it = this.queueMgr.getLeafQueues().iterator();
            while (it.hasNext()) {
                hashMap.putAll(resToPreempt(it.next(), time));
            }
            if (hashMap.isEmpty()) {
                return;
            }
            preemptResources(hashMap);
        }
    }

    protected void preemptResources(HashMap<FSAppAttempt, Resource> hashMap) {
        RMContainer preemptContainer;
        long time = getClock().getTime();
        if (hashMap.isEmpty()) {
            return;
        }
        Iterator<RMContainer> it = this.warnedContainers.iterator();
        while (it.hasNext()) {
            RMContainer next = it.next();
            if ((next.getState() == RMContainerState.RUNNING || next.getState() == RMContainerState.ALLOCATED) && !hashMap.isEmpty()) {
                warnOrKillContainer(next);
                updateResourceToPreempt(next, hashMap, this.preemptMapping.get(next));
            } else {
                it.remove();
            }
        }
        try {
            for (FSLeafQueue fSLeafQueue : getQueueManager().getLeafQueues()) {
                fSLeafQueue.resetPreemptedResources();
                fSLeafQueue.resetNonEligibleForPreemptionSet();
            }
            while (!hashMap.isEmpty() && (preemptContainer = getQueueManager().getRootQueue().preemptContainer()) != null) {
                FSSchedulerNode fSSchedulerNode = (FSSchedulerNode) this.nodes.get(preemptContainer.getContainer().getNodeId());
                FSAppAttempt fSAppAttempt = null;
                Iterator<Map.Entry<FSAppAttempt, Resource>> it2 = hashMap.entrySet().iterator();
                while (true) {
                    if (!it2.hasNext()) {
                        break;
                    }
                    Map.Entry<FSAppAttempt, Resource> next2 = it2.next();
                    if (!SchedulerAppUtils.isBlacklisted(next2.getKey(), fSSchedulerNode, LOG)) {
                        fSAppAttempt = next2.getKey();
                        break;
                    }
                }
                if (fSAppAttempt == null) {
                    getSchedulerApp(preemptContainer.getApplicationAttemptId()).addNonEligibleForPreemption(preemptContainer);
                } else {
                    warnOrKillContainer(preemptContainer);
                    this.warnedContainers.add(preemptContainer);
                    this.preemptMapping.put(preemptContainer, fSAppAttempt);
                    updateResourceToPreempt(preemptContainer, hashMap, fSAppAttempt);
                }
            }
            this.fsOpDurations.addPreemptCallDuration(getClock().getTime() - time);
        } finally {
            for (FSLeafQueue fSLeafQueue2 : getQueueManager().getLeafQueues()) {
                fSLeafQueue2.clearPreemptedResources();
                fSLeafQueue2.resetNonEligibleForPreemptionSet();
            }
        }
    }

    private void updateResourceToPreempt(RMContainer rMContainer, HashMap<FSAppAttempt, Resource> hashMap, FSAppAttempt fSAppAttempt) {
        Resource resource = hashMap.get(fSAppAttempt);
        Resource resource2 = rMContainer.getContainer().getResource();
        if (Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterResource, resource, resource2)) {
            hashMap.put(fSAppAttempt, Resources.subtractFrom(resource, resource2));
        } else {
            hashMap.remove(fSAppAttempt);
        }
    }

    protected void warnOrKillContainer(RMContainer rMContainer) {
        FSAppAttempt schedulerApp = getSchedulerApp(rMContainer.getApplicationAttemptId());
        LOG.info("Preempting container (prio=" + rMContainer.getContainer().getPriority() + "res=" + rMContainer.getContainer().getResource() + ") from queue " + schedulerApp.getQueue().getName());
        Long containerPreemptionTime = schedulerApp.getContainerPreemptionTime(rMContainer);
        if (containerPreemptionTime == null) {
            schedulerApp.addPreemption(rMContainer, getClock().getTime());
        } else if (containerPreemptionTime.longValue() + this.waitTimeBeforeKill < getClock().getTime()) {
            ContainerStatus createPreemptedContainerStatus = SchedulerUtils.createPreemptedContainerStatus(rMContainer.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
            recoverResourceRequestForContainer(rMContainer);
            completedContainer(rMContainer, createPreemptedContainerStatus, RMContainerEventType.KILL);
            LOG.info("Killing container" + rMContainer + " (after waiting for premption for " + (getClock().getTime() - containerPreemptionTime.longValue()) + "ms)");
        }
    }

    protected HashMap<FSAppAttempt, Resource> resToPreempt(FSLeafQueue fSLeafQueue, long j) {
        long minSharePreemptionTimeout = fSLeafQueue.getMinSharePreemptionTimeout();
        long fairSharePreemptionTimeout = fSLeafQueue.getFairSharePreemptionTimeout();
        Resource none = Resources.none();
        Resource none2 = Resources.none();
        if (j - fSLeafQueue.getLastTimeAtMinShare() > minSharePreemptionTimeout) {
            none = Resources.max(RESOURCE_CALCULATOR, this.clusterResource, Resources.none(), Resources.subtract(Resources.min(RESOURCE_CALCULATOR, this.clusterResource, fSLeafQueue.getMinShare(), fSLeafQueue.getDemand()), fSLeafQueue.getResourceUsage()));
        }
        if (j - fSLeafQueue.getLastTimeAtFairShareThreshold() > fairSharePreemptionTimeout) {
            none2 = Resources.max(RESOURCE_CALCULATOR, this.clusterResource, Resources.none(), Resources.subtract(Resources.min(RESOURCE_CALCULATOR, this.clusterResource, fSLeafQueue.getFairShare(), fSLeafQueue.getDemand()), fSLeafQueue.getResourceUsage()));
        }
        Resource max = Resources.max(RESOURCE_CALCULATOR, this.clusterResource, none, none2);
        HashMap<FSAppAttempt, Resource> hashMap = new HashMap<>();
        if (Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterResource, max, Resources.none())) {
            LOG.info("Should preempt " + max + " res for queue " + fSLeafQueue.getName() + ": resDueToMinShare = " + none + ", resDueToFairShare = " + none2);
            Iterator<FSAppAttempt> it = fSLeafQueue.getCopyOfRunnableAppSchedulables().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                FSAppAttempt next = it.next();
                Resource demand = next.getDemand();
                if (Resources.greaterThanOrEqual(RESOURCE_CALCULATOR, this.clusterResource, demand, max)) {
                    hashMap.put(next, max);
                    break;
                }
                hashMap.put(next, demand);
                max = Resources.subtract(max, demand);
                if (Resources.lessThanOrEqual(RESOURCE_CALCULATOR, this.clusterResource, max, Resources.none())) {
                    break;
                }
            }
        }
        return hashMap;
    }

    public synchronized RMContainerTokenSecretManager getContainerTokenSecretManager() {
        return this.rmContext.getContainerTokenSecretManager();
    }

    public synchronized ResourceWeights getAppWeight(FSAppAttempt fSAppAttempt) {
        double d = 1.0d;
        if (this.sizeBasedWeight) {
            d = Math.log1p(fSAppAttempt.getDemand().getMemory()) / Math.log(2.0d);
        }
        double priority = d * fSAppAttempt.getPriority().getPriority();
        if (this.weightAdjuster != null) {
            priority = this.weightAdjuster.adjustWeight(fSAppAttempt, priority);
        }
        ResourceWeights resourceWeights = fSAppAttempt.getResourceWeights();
        resourceWeights.setWeight((float) priority);
        return resourceWeights;
    }

    public Resource getIncrementResourceCapability() {
        return this.incrAllocation;
    }

    private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
        return (FSSchedulerNode) this.nodes.get(nodeId);
    }

    public double getNodeLocalityThreshold() {
        return this.nodeLocalityThreshold;
    }

    public double getRackLocalityThreshold() {
        return this.rackLocalityThreshold;
    }

    public long getNodeLocalityDelayMs() {
        return this.nodeLocalityDelayMs;
    }

    public long getRackLocalityDelayMs() {
        return this.rackLocalityDelayMs;
    }

    public boolean isContinuousSchedulingEnabled() {
        return this.continuousSchedulingEnabled;
    }

    public synchronized int getContinuousSchedulingSleepMs() {
        return this.continuousSchedulingSleepMs;
    }

    public Clock getClock() {
        return this.clock;
    }

    @VisibleForTesting
    void setClock(Clock clock) {
        this.clock = clock;
    }

    public FairSchedulerEventLog getEventLog() {
        return this.eventLog;
    }

    protected synchronized void addApplication(ApplicationId applicationId, String str, String str2, boolean z) {
        if (str == null || str.isEmpty()) {
            String str3 = "Reject application " + applicationId + " submitted by user " + str2 + " with an empty queue name.";
            LOG.info(str3);
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(applicationId, str3));
            return;
        }
        if (str.startsWith(CapacitySchedulerConfiguration.DOT) || str.endsWith(CapacitySchedulerConfiguration.DOT)) {
            String str4 = "Reject application " + applicationId + " submitted by user " + str2 + " with an illegal queue name " + str + ". The queue name cannot start/end with period.";
            LOG.info(str4);
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(applicationId, str4));
            return;
        }
        FSLeafQueue assignToQueue = assignToQueue(this.rmContext.getRMApps().get(applicationId), str, str2);
        if (assignToQueue == null) {
            return;
        }
        UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str2);
        if (!assignToQueue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, createRemoteUser) && !assignToQueue.hasAccess(QueueACL.ADMINISTER_QUEUE, createRemoteUser)) {
            String str5 = "User " + createRemoteUser.getUserName() + " cannot submit applications to queue " + assignToQueue.getName();
            LOG.info(str5);
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(applicationId, str5));
            return;
        }
        this.applications.put(applicationId, new SchedulerApplication(assignToQueue, str2));
        assignToQueue.getMetrics().submitApp(str2);
        LOG.info("Accepted application " + applicationId + " from user: " + str2 + ", in queue: " + str + ", currently num of applications: " + this.applications.size());
        if (!z) {
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
        }
    }

    protected synchronized void addApplicationAttempt(ApplicationAttemptId applicationAttemptId, boolean z, boolean z2) {
        SchedulerApplication schedulerApplication = (SchedulerApplication) this.applications.get(applicationAttemptId.getApplicationId());
        String user = schedulerApplication.getUser();
        FSLeafQueue fSLeafQueue = (FSLeafQueue) schedulerApplication.getQueue();
        FSAppAttempt fSAppAttempt = new FSAppAttempt(this, applicationAttemptId, user, fSLeafQueue, new ActiveUsersManager(getRootQueueMetrics()), this.rmContext);
        if (z) {
            fSAppAttempt.transferStateFromPreviousAttempt(schedulerApplication.getCurrentAppAttempt());
        }
        schedulerApplication.setCurrentAppAttempt(fSAppAttempt);
        boolean canAppBeRunnable = this.maxRunningEnforcer.canAppBeRunnable(fSLeafQueue, user);
        fSLeafQueue.addApp(fSAppAttempt, canAppBeRunnable);
        if (canAppBeRunnable) {
            this.maxRunningEnforcer.trackRunnableApp(fSAppAttempt);
        } else {
            this.maxRunningEnforcer.trackNonRunnableApp(fSAppAttempt);
        }
        fSLeafQueue.getMetrics().submitAppAttempt(user);
        LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user);
        if (!z2) {
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED));
        } else if (LOG.isDebugEnabled()) {
            LOG.debug(applicationAttemptId + " is recovering. Skipping notifying ATTEMPT_ADDED");
        }
    }

    @VisibleForTesting
    FSLeafQueue assignToQueue(RMApp rMApp, String str, String str2) {
        FSLeafQueue fSLeafQueue = null;
        String str3 = null;
        try {
            str = this.allocConf.getPlacementPolicy().assignAppToQueue(str, str2);
            if (str == null) {
                str3 = "Application rejected by queue placement policy";
            } else {
                fSLeafQueue = this.queueMgr.getLeafQueue(str, true);
                if (fSLeafQueue == null) {
                    str3 = str + " is not a leaf queue";
                }
            }
        } catch (IOException e) {
            str3 = "Error assigning app to queue " + str;
        }
        if (str3 != null && rMApp != null) {
            LOG.error(str3);
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(rMApp.getApplicationId(), str3));
            return null;
        }
        if (rMApp != null) {
            rMApp.setQueue(fSLeafQueue.getName());
        } else {
            LOG.error("Couldn't find RM app to set queue name on");
        }
        return fSLeafQueue;
    }

    private synchronized void removeApplication(ApplicationId applicationId, RMAppState rMAppState) {
        SchedulerApplication schedulerApplication = (SchedulerApplication) this.applications.get(applicationId);
        if (schedulerApplication == null) {
            LOG.warn("Couldn't find application " + applicationId);
        } else {
            schedulerApplication.stop(rMAppState);
            this.applications.remove(applicationId);
        }
    }

    private synchronized void removeApplicationAttempt(ApplicationAttemptId applicationAttemptId, RMAppAttemptState rMAppAttemptState, boolean z) {
        LOG.info("Application " + applicationAttemptId + " is done. finalState=" + rMAppAttemptState);
        SchedulerApplication schedulerApplication = (SchedulerApplication) this.applications.get(applicationAttemptId.getApplicationId());
        FSAppAttempt schedulerApp = getSchedulerApp(applicationAttemptId);
        if (schedulerApp == null || schedulerApplication == null) {
            LOG.info("Unknown application " + applicationAttemptId + " has completed!");
            return;
        }
        for (RMContainer rMContainer : schedulerApp.getLiveContainers()) {
            if (z && rMContainer.getState().equals(RMContainerState.RUNNING)) {
                LOG.info("Skip killing " + rMContainer.getContainerId());
            } else {
                completedContainer(rMContainer, SchedulerUtils.createAbnormalContainerStatus(rMContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL);
            }
        }
        for (RMContainer rMContainer2 : schedulerApp.getReservedContainers()) {
            completedContainer(rMContainer2, SchedulerUtils.createAbnormalContainerStatus(rMContainer2.getContainerId(), "Application Complete"), RMContainerEventType.KILL);
        }
        schedulerApp.stop(rMAppAttemptState);
        if (!this.queueMgr.getLeafQueue(schedulerApp.getQueue().getQueueName(), false).removeApp(schedulerApp)) {
            this.maxRunningEnforcer.untrackNonRunnableApp(schedulerApp);
        } else {
            this.maxRunningEnforcer.untrackRunnableApp(schedulerApp);
            this.maxRunningEnforcer.updateRunnabilityOnAppRemoval(schedulerApp, schedulerApp.getQueue());
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler
    protected synchronized void completedContainer(RMContainer rMContainer, ContainerStatus containerStatus, RMContainerEventType rMContainerEventType) {
        if (rMContainer == null) {
            LOG.info("Null container completed...");
            return;
        }
        Container container = rMContainer.getContainer();
        FSAppAttempt currentAttemptForContainer = getCurrentAttemptForContainer(container.getId());
        ApplicationId applicationId = container.getId().getApplicationAttemptId().getApplicationId();
        if (currentAttemptForContainer == null) {
            LOG.info("Container " + container + " of unknown application attempt " + applicationId + " completed with event " + rMContainerEventType);
            return;
        }
        FSSchedulerNode fSSchedulerNode = getFSSchedulerNode(container.getNodeId());
        if (rMContainer.getState() == RMContainerState.RESERVED) {
            currentAttemptForContainer.unreserve(rMContainer.getReservedPriority(), fSSchedulerNode);
        } else {
            currentAttemptForContainer.containerCompleted(rMContainer, containerStatus, rMContainerEventType);
            fSSchedulerNode.releaseContainer(container);
            updateRootQueueMetrics();
        }
        LOG.info("Application attempt " + currentAttemptForContainer.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + fSSchedulerNode + " with event: " + rMContainerEventType);
    }

    private synchronized void addNode(RMNode rMNode) {
        FSSchedulerNode fSSchedulerNode = new FSSchedulerNode(rMNode, this.usePortForNodeName);
        this.nodes.put(rMNode.getNodeID(), fSSchedulerNode);
        Resources.addTo(this.clusterResource, rMNode.getTotalCapability());
        updateRootQueueMetrics();
        updateMaximumAllocation(fSSchedulerNode, true);
        this.queueMgr.getRootQueue().setSteadyFairShare(this.clusterResource);
        this.queueMgr.getRootQueue().recomputeSteadyShares();
        LOG.info("Added node " + rMNode.getNodeAddress() + " cluster capacity: " + this.clusterResource);
    }

    private synchronized void removeNode(RMNode rMNode) {
        FSSchedulerNode fSSchedulerNode = getFSSchedulerNode(rMNode.getNodeID());
        if (fSSchedulerNode == null) {
            return;
        }
        Resources.subtractFrom(this.clusterResource, rMNode.getTotalCapability());
        updateRootQueueMetrics();
        for (RMContainer rMContainer : fSSchedulerNode.getRunningContainers()) {
            completedContainer(rMContainer, SchedulerUtils.createAbnormalContainerStatus(rMContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
        }
        RMContainer reservedContainer = fSSchedulerNode.getReservedContainer();
        if (reservedContainer != null) {
            completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus(reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
        }
        this.nodes.remove(rMNode.getNodeID());
        this.queueMgr.getRootQueue().setSteadyFairShare(this.clusterResource);
        this.queueMgr.getRootQueue().recomputeSteadyShares();
        updateMaximumAllocation(fSSchedulerNode, false);
        LOG.info("Removed node " + rMNode.getNodeAddress() + " cluster capacity: " + this.clusterResource);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> list, List<ContainerId> list2, List<String> list3, List<String> list4) {
        Allocation allocation;
        FSAppAttempt schedulerApp = getSchedulerApp(applicationAttemptId);
        if (schedulerApp == null) {
            LOG.info("Calling allocate on removed or non existant application " + applicationAttemptId);
            return EMPTY_ALLOCATION;
        }
        SchedulerUtils.normalizeRequests(list, new DiskBasedResourceCalculator(), this.clusterResource, this.minimumAllocation, getMaximumResourceCapability(), this.incrAllocation);
        if (!schedulerApp.getUnmanagedAM() && list.size() == 1 && schedulerApp.getLiveContainers().isEmpty()) {
            schedulerApp.setAMResource(list.get(0).getCapability());
        }
        releaseContainers(list2, schedulerApp);
        synchronized (schedulerApp) {
            if (!list.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("allocate: pre-update applicationAttemptId=" + applicationAttemptId + " application=" + schedulerApp.getApplicationId());
                }
                schedulerApp.showRequests();
                schedulerApp.updateResourceRequests(list);
                schedulerApp.showRequests();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("allocate: post-update applicationAttemptId=" + applicationAttemptId + " #ask=" + list.size() + " reservation= " + schedulerApp.getCurrentReservation());
                LOG.debug("Preempting " + schedulerApp.getPreemptionContainers().size() + " container(s)");
            }
            HashSet hashSet = new HashSet();
            Iterator<RMContainer> it = schedulerApp.getPreemptionContainers().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().getContainerId());
            }
            schedulerApp.updateBlacklist(list3, list4);
            SchedulerApplicationAttempt.ContainersAndNMTokensAllocation pullNewlyAllocatedContainersAndNMTokens = schedulerApp.pullNewlyAllocatedContainersAndNMTokens();
            Resource headroom = schedulerApp.getHeadroom();
            schedulerApp.setApplicationHeadroomForMetrics(headroom);
            allocation = new Allocation(pullNewlyAllocatedContainersAndNMTokens.getContainerList(), headroom, hashSet, null, null, pullNewlyAllocatedContainersAndNMTokens.getNMTokenList());
        }
        return allocation;
    }

    private synchronized void nodeUpdate(RMNode rMNode) {
        long time = getClock().getTime();
        if (LOG.isDebugEnabled()) {
            LOG.debug("nodeUpdate: " + rMNode + " cluster capacity: " + this.clusterResource);
        }
        this.eventLog.log("HEARTBEAT", rMNode.getHostName());
        FSSchedulerNode fSSchedulerNode = getFSSchedulerNode(rMNode.getNodeID());
        List<UpdatedContainerInfo> pullContainerUpdates = rMNode.pullContainerUpdates();
        ArrayList arrayList = new ArrayList();
        ArrayList<ContainerStatus> arrayList2 = new ArrayList();
        for (UpdatedContainerInfo updatedContainerInfo : pullContainerUpdates) {
            arrayList.addAll(updatedContainerInfo.getNewlyLaunchedContainers());
            arrayList2.addAll(updatedContainerInfo.getCompletedContainers());
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            containerLaunchedOnNode(((ContainerStatus) it.next()).getContainerId(), fSSchedulerNode);
        }
        for (ContainerStatus containerStatus : arrayList2) {
            ContainerId containerId = containerStatus.getContainerId();
            LOG.debug("Container FINISHED: " + containerId);
            completedContainer(getRMContainer(containerId), containerStatus, RMContainerEventType.FINISHED);
        }
        if (!this.continuousSchedulingEnabled) {
            attemptScheduling(fSSchedulerNode);
        } else if (!arrayList2.isEmpty()) {
            attemptScheduling(fSSchedulerNode);
        }
        this.fsOpDurations.addNodeUpdateDuration(getClock().getTime() - time);
    }

    void continuousSchedulingAttempt() throws InterruptedException {
        long time = getClock().getTime();
        ArrayList arrayList = new ArrayList(this.nodes.keySet());
        synchronized (this) {
            Collections.sort(arrayList, this.nodeAvailableResourceComparator);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            FSSchedulerNode fSSchedulerNode = getFSSchedulerNode((NodeId) it.next());
            if (fSSchedulerNode != null) {
                try {
                    if (Resources.fitsIn(this.minimumAllocation, fSSchedulerNode.getAvailableResource())) {
                        attemptScheduling(fSSchedulerNode);
                    }
                } catch (Throwable th) {
                    LOG.error("Error while attempting scheduling for node " + fSSchedulerNode + ": " + th.toString(), th);
                }
            }
        }
        this.fsOpDurations.addContinuousSchedulingRunDuration(getClock().getTime() - time);
    }

    private synchronized void attemptScheduling(FSSchedulerNode fSSchedulerNode) {
        if (!this.rmContext.isWorkPreservingRecoveryEnabled() || this.rmContext.isSchedulerReadyForAllocatingContainers()) {
            FSAppAttempt reservedAppSchedulable = fSSchedulerNode.getReservedAppSchedulable();
            if (reservedAppSchedulable != null) {
                Priority reservedPriority = fSSchedulerNode.getReservedContainer().getReservedPriority();
                FSLeafQueue queue = reservedAppSchedulable.getQueue();
                if (reservedAppSchedulable.hasContainerForNode(reservedPriority, fSSchedulerNode) && fitsInMaxShare(queue, fSSchedulerNode.getReservedContainer().getReservedResource())) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Trying to fulfill reservation for application " + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + fSSchedulerNode);
                    }
                    fSSchedulerNode.getReservedAppSchedulable().assignReservedContainer(fSSchedulerNode);
                } else {
                    LOG.info("Releasing reservation that cannot be satisfied for application " + reservedAppSchedulable.getApplicationAttemptId() + " on node " + fSSchedulerNode);
                    reservedAppSchedulable.unreserve(reservedPriority, fSSchedulerNode);
                    reservedAppSchedulable = null;
                }
            }
            if (reservedAppSchedulable == null) {
                int i = 0;
                while (fSSchedulerNode.getReservedContainer() == null) {
                    boolean z = false;
                    if (!this.queueMgr.getRootQueue().assignContainer(fSSchedulerNode).equals(Resources.none())) {
                        i++;
                        z = true;
                    }
                    if (!z || !this.assignMultiple || (i >= this.maxAssign && this.maxAssign > 0)) {
                        break;
                    }
                }
            }
            updateRootQueueMetrics();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean fitsInMaxShare(FSQueue fSQueue, Resource resource) {
        if (!Resources.fitsIn(Resources.add(fSQueue.getResourceUsage(), resource), fSQueue.getMaxShare())) {
            return false;
        }
        FSParentQueue parent = fSQueue.getParent();
        if (parent != null) {
            return fitsInMaxShare(parent, resource);
        }
        return true;
    }

    public FSAppAttempt getSchedulerApp(ApplicationAttemptId applicationAttemptId) {
        return (FSAppAttempt) super.getApplicationAttempt(applicationAttemptId);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext
    public ResourceCalculator getResourceCalculator() {
        return RESOURCE_CALCULATOR;
    }

    private void updateRootQueueMetrics() {
        this.rootMetrics.setAvailableResourcesToQueue(Resources.subtract(this.clusterResource, this.rootMetrics.getAllocatedResources()));
    }

    private boolean shouldAttemptPreemption() {
        return this.preemptionEnabled && this.preemptionUtilizationThreshold < Math.max(((float) this.rootMetrics.getAllocatedMB()) / ((float) this.clusterResource.getMemory()), ((float) this.rootMetrics.getAllocatedVirtualCores()) / ((float) this.clusterResource.getVirtualCores()));
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public QueueMetrics getRootQueueMetrics() {
        return this.rootMetrics;
    }

    public void handle(SchedulerEvent schedulerEvent) {
        switch ((SchedulerEventType) schedulerEvent.getType()) {
            case NODE_ADDED:
                if (!(schedulerEvent instanceof NodeAddedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                NodeAddedSchedulerEvent nodeAddedSchedulerEvent = (NodeAddedSchedulerEvent) schedulerEvent;
                addNode(nodeAddedSchedulerEvent.getAddedRMNode());
                recoverContainersOnNode(nodeAddedSchedulerEvent.getContainerReports(), nodeAddedSchedulerEvent.getAddedRMNode());
                return;
            case NODE_REMOVED:
                if (!(schedulerEvent instanceof NodeRemovedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                removeNode(((NodeRemovedSchedulerEvent) schedulerEvent).getRemovedRMNode());
                return;
            case NODE_UPDATE:
                if (!(schedulerEvent instanceof NodeUpdateSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                nodeUpdate(((NodeUpdateSchedulerEvent) schedulerEvent).getRMNode());
                return;
            case APP_ADDED:
                if (!(schedulerEvent instanceof AppAddedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                AppAddedSchedulerEvent appAddedSchedulerEvent = (AppAddedSchedulerEvent) schedulerEvent;
                String resolveReservationQueueName = resolveReservationQueueName(appAddedSchedulerEvent.getQueue(), appAddedSchedulerEvent.getApplicationId(), appAddedSchedulerEvent.getReservationID());
                if (resolveReservationQueueName != null) {
                    addApplication(appAddedSchedulerEvent.getApplicationId(), resolveReservationQueueName, appAddedSchedulerEvent.getUser(), appAddedSchedulerEvent.getIsAppRecovering());
                    return;
                }
                return;
            case APP_REMOVED:
                if (!(schedulerEvent instanceof AppRemovedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                AppRemovedSchedulerEvent appRemovedSchedulerEvent = (AppRemovedSchedulerEvent) schedulerEvent;
                removeApplication(appRemovedSchedulerEvent.getApplicationID(), appRemovedSchedulerEvent.getFinalState());
                return;
            case NODE_RESOURCE_UPDATE:
                if (!(schedulerEvent instanceof NodeResourceUpdateSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                NodeResourceUpdateSchedulerEvent nodeResourceUpdateSchedulerEvent = (NodeResourceUpdateSchedulerEvent) schedulerEvent;
                updateNodeResource(nodeResourceUpdateSchedulerEvent.getRMNode(), nodeResourceUpdateSchedulerEvent.getResourceOption());
                return;
            case APP_ATTEMPT_ADDED:
                if (!(schedulerEvent instanceof AppAttemptAddedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                AppAttemptAddedSchedulerEvent appAttemptAddedSchedulerEvent = (AppAttemptAddedSchedulerEvent) schedulerEvent;
                addApplicationAttempt(appAttemptAddedSchedulerEvent.getApplicationAttemptId(), appAttemptAddedSchedulerEvent.getTransferStateFromPreviousAttempt(), appAttemptAddedSchedulerEvent.getIsAttemptRecovering());
                return;
            case APP_ATTEMPT_REMOVED:
                if (!(schedulerEvent instanceof AppAttemptRemovedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                AppAttemptRemovedSchedulerEvent appAttemptRemovedSchedulerEvent = (AppAttemptRemovedSchedulerEvent) schedulerEvent;
                removeApplicationAttempt(appAttemptRemovedSchedulerEvent.getApplicationAttemptID(), appAttemptRemovedSchedulerEvent.getFinalAttemptState(), appAttemptRemovedSchedulerEvent.getKeepContainersAcrossAppAttempts());
                return;
            case CONTAINER_EXPIRED:
                if (!(schedulerEvent instanceof ContainerExpiredSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                ContainerId containerId = ((ContainerExpiredSchedulerEvent) schedulerEvent).getContainerId();
                completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus(containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE);
                return;
            default:
                LOG.error("Unknown event arrived at FairScheduler: " + schedulerEvent.toString());
                return;
        }
    }

    private synchronized String resolveReservationQueueName(String str, ApplicationId applicationId, ReservationId reservationId) {
        String defaultQueueForPlanQueue;
        FSQueue queue = this.queueMgr.getQueue(str);
        if (queue == null || !this.allocConf.isReservable(queue.getQueueName())) {
            return str;
        }
        String queueName = queue.getQueueName();
        if (reservationId != null) {
            String str2 = queueName + CapacitySchedulerConfiguration.DOT + reservationId.toString();
            FSQueue queue2 = this.queueMgr.getQueue(str2);
            if (queue2 == null) {
                this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(applicationId, "Application " + applicationId + " submitted to a reservation which is not yet currently active: " + str2));
                return null;
            }
            if (!queue2.getParent().getQueueName().equals(queueName)) {
                this.rmContext.getDispatcher().getEventHandler().handle(new RMAppRejectedEvent(applicationId, "Application: " + applicationId + " submitted to a reservation " + str2 + " which does not belong to the specified queue: " + queueName));
                return null;
            }
            defaultQueueForPlanQueue = str2;
        } else {
            defaultQueueForPlanQueue = getDefaultQueueForPlanQueue(queueName);
        }
        return defaultQueueForPlanQueue;
    }

    private String getDefaultQueueForPlanQueue(String str) {
        return str + CapacitySchedulerConfiguration.DOT + str.substring(str.lastIndexOf(CapacitySchedulerConfiguration.DOT) + 1) + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable
    public void recover(RMStateStore.RMState rMState) throws Exception {
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
    public synchronized void setRMContext(RMContext rMContext) {
        this.rmContext = rMContext;
    }

    private void initScheduler(Configuration configuration) throws IOException {
        synchronized (this) {
            this.conf = new FairSchedulerConfiguration(configuration);
            validateConf(this.conf);
            this.minimumAllocation = this.conf.getMinimumAllocation();
            initMaximumResourceCapability(this.conf.getMaximumAllocation());
            this.incrAllocation = this.conf.getIncrementAllocation();
            this.continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
            this.continuousSchedulingSleepMs = this.conf.getContinuousSchedulingSleepMs();
            this.nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
            this.rackLocalityThreshold = this.conf.getLocalityThresholdRack();
            this.nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
            this.rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
            this.preemptionEnabled = this.conf.getPreemptionEnabled();
            this.preemptionUtilizationThreshold = this.conf.getPreemptionUtilizationThreshold();
            this.assignMultiple = this.conf.getAssignMultiple();
            this.maxAssign = this.conf.getMaxAssign();
            this.sizeBasedWeight = this.conf.getSizeBasedWeight();
            this.preemptionInterval = this.conf.getPreemptionInterval();
            this.waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
            this.usePortForNodeName = this.conf.getUsePortForNodeName();
            this.updateInterval = this.conf.getUpdateInterval();
            if (this.updateInterval < 0) {
                this.updateInterval = 500L;
                LOG.warn("yarn.scheduler.fair.update-interval-ms is invalid, so using default value 500 ms instead");
            }
            this.rootMetrics = FSQueueMetrics.forQueue("root", (Queue) null, true, configuration);
            this.fsOpDurations = FSOpDurations.getInstance(true);
            this.applications = new ConcurrentHashMap();
            this.eventLog = new FairSchedulerEventLog();
            this.eventLog.init(this.conf);
            this.allocConf = new AllocationConfiguration(configuration);
            try {
                this.queueMgr.initialize(configuration);
                this.updateThread = new UpdateThread();
                this.updateThread.setName("FairSchedulerUpdateThread");
                this.updateThread.setDaemon(true);
                if (this.continuousSchedulingEnabled) {
                    this.schedulingThread = new ContinuousSchedulingThread();
                    this.schedulingThread.setName("FairSchedulerContinuousScheduling");
                    this.schedulingThread.setDaemon(true);
                }
            } catch (Exception e) {
                throw new IOException("Failed to start FairScheduler", e);
            }
        }
        this.allocsLoader.init(configuration);
        this.allocsLoader.setReloadListener(new AllocationReloadListener());
        try {
            this.allocsLoader.reloadAllocations();
        } catch (Exception e2) {
            throw new IOException("Failed to initialize FairScheduler", e2);
        }
    }

    private synchronized void startSchedulerThreads() {
        Preconditions.checkNotNull(this.updateThread, "updateThread is null");
        Preconditions.checkNotNull(this.allocsLoader, "allocsLoader is null");
        this.updateThread.start();
        if (this.continuousSchedulingEnabled) {
            Preconditions.checkNotNull(this.schedulingThread, "schedulingThread is null");
            this.schedulingThread.start();
        }
        this.allocsLoader.start();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler
    public void serviceInit(Configuration configuration) throws Exception {
        initScheduler(configuration);
        super.serviceInit(configuration);
    }

    public void serviceStart() throws Exception {
        startSchedulerThreads();
        super.serviceStart();
    }

    public void serviceStop() throws Exception {
        synchronized (this) {
            if (this.updateThread != null) {
                this.updateThread.interrupt();
                this.updateThread.join(1000L);
            }
            if (this.continuousSchedulingEnabled && this.schedulingThread != null) {
                this.schedulingThread.interrupt();
                this.schedulingThread.join(1000L);
            }
            if (this.allocsLoader != null) {
                this.allocsLoader.stop();
            }
        }
        super.serviceStop();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
    public void reinitialize(Configuration configuration, RMContext rMContext) throws IOException {
        try {
            this.allocsLoader.reloadAllocations();
        } catch (Exception e) {
            LOG.error("Failed to reload allocations file", e);
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public QueueInfo getQueueInfo(String str, boolean z, boolean z2) throws IOException {
        if (this.queueMgr.exists(str)) {
            return this.queueMgr.getQueue(str).getQueueInfo(z, z2);
        }
        throw new IOException("queue " + str + " does not exist");
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public List<QueueUserACLInfo> getQueueUserAclInfo() {
        try {
            return this.queueMgr.getRootQueue().getQueueUserAclInfo(UserGroupInformation.getCurrentUser());
        } catch (IOException e) {
            return new ArrayList();
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext
    public int getNumClusterNodes() {
        return this.nodes.size();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public synchronized boolean checkAccess(UserGroupInformation userGroupInformation, QueueACL queueACL, String str) {
        FSQueue queue = getQueueManager().getQueue(str);
        if (queue != null) {
            return queue.hasAccess(queueACL, userGroupInformation);
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("ACL not found for queue access-type " + queueACL + " for queue " + str);
        return false;
    }

    public AllocationConfiguration getAllocationConfiguration() {
        return this.allocConf;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public List<ApplicationAttemptId> getAppsInQueue(String str) {
        FSQueue queue = this.queueMgr.getQueue(str);
        if (queue == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        queue.collectSchedulerApplications(arrayList);
        return arrayList;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public synchronized String moveApplication(ApplicationId applicationId, String str) throws YarnException {
        SchedulerApplication<FSAppAttempt> schedulerApplication = (SchedulerApplication) this.applications.get(applicationId);
        if (schedulerApplication == null) {
            throw new YarnException("App to be moved " + applicationId + " not found.");
        }
        FSAppAttempt currentAppAttempt = schedulerApplication.getCurrentAppAttempt();
        synchronized (currentAppAttempt) {
            FSLeafQueue fSLeafQueue = (FSLeafQueue) schedulerApplication.getQueue();
            FSLeafQueue leafQueue = this.queueMgr.getLeafQueue(handleMoveToPlanQueue(str), false);
            if (leafQueue == null) {
                throw new YarnException("Target queue " + str + " not found or is not a leaf queue.");
            }
            if (leafQueue == fSLeafQueue) {
                return fSLeafQueue.getQueueName();
            }
            if (fSLeafQueue.isRunnableApp(currentAppAttempt)) {
                verifyMoveDoesNotViolateConstraints(currentAppAttempt, fSLeafQueue, leafQueue);
            }
            executeMove(schedulerApplication, currentAppAttempt, fSLeafQueue, leafQueue);
            return leafQueue.getQueueName();
        }
    }

    private void verifyMoveDoesNotViolateConstraints(FSAppAttempt fSAppAttempt, FSLeafQueue fSLeafQueue, FSLeafQueue fSLeafQueue2) throws YarnException {
        String queueName = fSLeafQueue2.getQueueName();
        ApplicationAttemptId applicationAttemptId = fSAppAttempt.getApplicationAttemptId();
        FSQueue findLowestCommonAncestorQueue = findLowestCommonAncestorQueue(fSLeafQueue, fSLeafQueue2);
        Resource currentConsumption = fSAppAttempt.getCurrentConsumption();
        FSQueue fSQueue = fSLeafQueue2;
        while (true) {
            FSQueue fSQueue2 = fSQueue;
            if (fSQueue2 == findLowestCommonAncestorQueue) {
                return;
            }
            if (fSQueue2.getNumRunnableApps() == this.allocConf.getQueueMaxApps(fSQueue2.getQueueName())) {
                throw new YarnException("Moving app attempt " + applicationAttemptId + " to queue " + queueName + " would violate queue maxRunningApps constraints on queue " + fSQueue2.getQueueName());
            }
            if (!Resources.fitsIn(Resources.add(fSQueue2.getResourceUsage(), currentConsumption), fSQueue2.getMaxShare())) {
                throw new YarnException("Moving app attempt " + applicationAttemptId + " to queue " + queueName + " would violate queue maxShare constraints on queue " + fSQueue2.getQueueName());
            }
            fSQueue = fSQueue2.getParent();
        }
    }

    private void executeMove(SchedulerApplication<FSAppAttempt> schedulerApplication, FSAppAttempt fSAppAttempt, FSLeafQueue fSLeafQueue, FSLeafQueue fSLeafQueue2) {
        boolean removeApp = fSLeafQueue.removeApp(fSAppAttempt);
        boolean canAppBeRunnable = this.maxRunningEnforcer.canAppBeRunnable(fSLeafQueue2, fSAppAttempt.getUser());
        if (removeApp && !canAppBeRunnable) {
            throw new IllegalStateException("Should have already verified that app " + fSAppAttempt.getApplicationId() + " would be runnable in new queue");
        }
        if (removeApp) {
            this.maxRunningEnforcer.untrackRunnableApp(fSAppAttempt);
        } else if (canAppBeRunnable) {
            this.maxRunningEnforcer.untrackNonRunnableApp(fSAppAttempt);
        }
        fSAppAttempt.move(fSLeafQueue2);
        schedulerApplication.setQueue(fSLeafQueue2);
        fSLeafQueue2.addApp(fSAppAttempt, canAppBeRunnable);
        if (canAppBeRunnable) {
            this.maxRunningEnforcer.trackRunnableApp(fSAppAttempt);
        }
        if (removeApp) {
            this.maxRunningEnforcer.updateRunnabilityOnAppRemoval(fSAppAttempt, fSLeafQueue);
        }
    }

    @VisibleForTesting
    FSQueue findLowestCommonAncestorQueue(FSQueue fSQueue, FSQueue fSQueue2) {
        String name = fSQueue.getName();
        String name2 = fSQueue2.getName();
        int i = -1;
        for (int i2 = 0; i2 < Math.max(name.length(), name2.length()); i2++) {
            if (name.length() <= i2 || name2.length() <= i2 || name.charAt(i2) != name2.charAt(i2)) {
                return this.queueMgr.getQueue(name.substring(0, i));
            }
            if (name.charAt(i2) == '.') {
                i = i2;
            }
        }
        return fSQueue;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler
    public synchronized void updateNodeResource(RMNode rMNode, ResourceOption resourceOption) {
        super.updateNodeResource(rMNode, resourceOption);
        updateRootQueueMetrics();
        this.queueMgr.getRootQueue().setSteadyFairShare(this.clusterResource);
        this.queueMgr.getRootQueue().recomputeSteadyShares();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public EnumSet<YarnServiceProtos.SchedulerResourceTypes> getSchedulingResourceTypes() {
        return EnumSet.of(YarnServiceProtos.SchedulerResourceTypes.MEMORY, YarnServiceProtos.SchedulerResourceTypes.CPU, YarnServiceProtos.SchedulerResourceTypes.DISK);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public Set<String> getPlanQueues() throws YarnException {
        HashSet hashSet = new HashSet();
        Iterator<FSQueue> it = this.queueMgr.getQueues().iterator();
        while (it.hasNext()) {
            String name = it.next().getName();
            if (this.allocConf.isReservable(name)) {
                hashSet.add(name);
            }
        }
        return hashSet;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public void setEntitlement(String str, QueueEntitlement queueEntitlement) throws YarnException {
        FSLeafQueue leafQueue = this.queueMgr.getLeafQueue(str, false);
        if (leafQueue == null) {
            throw new YarnException("Target queue " + str + " not found or is not a leaf queue.");
        }
        leafQueue.setWeights(queueEntitlement.getCapacity());
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public void removeQueue(String str) throws YarnException {
        if (this.queueMgr.getLeafQueue(str, false) != null && !this.queueMgr.removeLeafQueue(str)) {
            throw new YarnException("Could not remove queue " + str + " as its either not a leaf queue or its not empty");
        }
    }

    private String handleMoveToPlanQueue(String str) {
        FSQueue queue = this.queueMgr.getQueue(str);
        if (queue != null && this.allocConf.isReservable(queue.getQueueName())) {
            str = getDefaultQueueForPlanQueue(str);
        }
        return str;
    }
}
