/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.metrics.OpportunisticSchedulerMetrics;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedOpportunisticContainerAllocator
extends OpportunisticContainerAllocator {
    private static final int NODE_LOCAL_LOOP = 0;
    private static final int RACK_LOCAL_LOOP = 1;
    private static final int OFF_SWITCH_LOOP = 2;
    private static final Logger LOG = LoggerFactory.getLogger(DistributedOpportunisticContainerAllocator.class);

    public DistributedOpportunisticContainerAllocator(BaseContainerTokenSecretManager tokenSecretManager) {
        super(tokenSecretManager);
    }

    public DistributedOpportunisticContainerAllocator(BaseContainerTokenSecretManager tokenSecretManager, int maxAllocationsPerAMHeartbeat) {
        super(tokenSecretManager, maxAllocationsPerAMHeartbeat);
    }

    @Override
    public List<Container> allocateContainers(ResourceBlacklistRequest blackList, List<ResourceRequest> oppResourceReqs, ApplicationAttemptId applicationAttemptId, OpportunisticContainerContext opportContext, long rmIdentifier, String appSubmitter) throws YarnException {
        this.updateBlacklist(blackList, opportContext);
        opportContext.addToOutstandingReqs(oppResourceReqs);
        HashSet<String> nodeBlackList = new HashSet<String>(opportContext.getBlacklist());
        HashSet<String> allocatedNodes = new HashSet<String>();
        ArrayList<Container> allocatedContainers = new ArrayList<Container>();
        boolean continueLoop = true;
        while (continueLoop) {
            continueLoop = false;
            ArrayList<Map<Resource, List<OpportunisticContainerAllocator.Allocation>>> allocations = new ArrayList<Map<Resource, List<OpportunisticContainerAllocator.Allocation>>>();
            for (SchedulerRequestKey schedulerKey : opportContext.getOutstandingOpReqs().descendingKeySet()) {
                int remAllocs = -1;
                int maxAllocationsPerAMHeartbeat = this.getMaxAllocationsPerAMHeartbeat();
                if (maxAllocationsPerAMHeartbeat > 0 && (remAllocs = maxAllocationsPerAMHeartbeat - allocatedContainers.size() - this.getTotalAllocations(allocations)) <= 0) {
                    LOG.info("Not allocating more containers as we have reached max allocations per AM heartbeat {}", (Object)maxAllocationsPerAMHeartbeat);
                    break;
                }
                Map<Resource, List<OpportunisticContainerAllocator.Allocation>> allocation = this.allocate(rmIdentifier, opportContext, schedulerKey, applicationAttemptId, appSubmitter, nodeBlackList, allocatedNodes, remAllocs);
                if (allocation.size() <= 0) continue;
                allocations.add(allocation);
                continueLoop = true;
            }
            this.matchAllocation(allocations, allocatedContainers, opportContext);
        }
        return allocatedContainers;
    }

    private Map<Resource, List<OpportunisticContainerAllocator.Allocation>> allocate(long rmIdentifier, OpportunisticContainerContext appContext, SchedulerRequestKey schedKey, ApplicationAttemptId appAttId, String userName, Set<String> blackList, Set<String> allocatedNodes, int maxAllocations) throws YarnException {
        HashMap<Resource, List<OpportunisticContainerAllocator.Allocation>> containers = new HashMap<Resource, List<OpportunisticContainerAllocator.Allocation>>();
        for (OpportunisticContainerAllocator.EnrichedResourceRequest enrichedAsk : appContext.getOutstandingOpReqs().get(schedKey).values()) {
            int remainingAllocs = -1;
            if (maxAllocations > 0) {
                int totalAllocated = 0;
                for (List allocs : containers.values()) {
                    totalAllocated += allocs.size();
                }
                remainingAllocs = maxAllocations - totalAllocated;
                if (remainingAllocs <= 0) {
                    LOG.info("Not allocating more containers as max allocations per AM heartbeat {} has reached", (Object)this.getMaxAllocationsPerAMHeartbeat());
                    break;
                }
            }
            this.allocateContainersInternal(rmIdentifier, appContext.getAppParams(), appContext.getContainerIdGenerator(), blackList, allocatedNodes, appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk, remainingAllocs);
            ResourceRequest anyAsk = enrichedAsk.getRequest();
            if (containers.isEmpty()) continue;
            LOG.info("Opportunistic allocation requested for [priority={}, allocationRequestId={}, num_containers={}, capability={}] allocated = {}", new Object[]{anyAsk.getPriority(), anyAsk.getAllocationRequestId(), anyAsk.getNumContainers(), anyAsk.getCapability(), containers.keySet()});
        }
        return containers;
    }

    private void allocateContainersInternal(long rmIdentifier, OpportunisticContainerAllocator.AllocationParams appParams, OpportunisticContainerAllocator.ContainerIdGenerator idCounter, Set<String> blacklist, Set<String> allocatedNodes, ApplicationAttemptId id, Map<String, RemoteNode> allNodes, String userName, Map<Resource, List<OpportunisticContainerAllocator.Allocation>> allocations, OpportunisticContainerAllocator.EnrichedResourceRequest enrichedAsk, int maxAllocations) throws YarnException {
        if (allNodes.size() == 0) {
            LOG.info("No nodes currently available to allocate OPPORTUNISTIC containers.");
            return;
        }
        ResourceRequest anyAsk = enrichedAsk.getRequest();
        int toAllocate = anyAsk.getNumContainers() - (allocations.isEmpty() ? 0 : allocations.get(anyAsk.getCapability()).size());
        toAllocate = Math.min(toAllocate, appParams.getMaxAllocationsPerSchedulerKeyPerRound());
        if (maxAllocations >= 0) {
            toAllocate = Math.min(maxAllocations, toAllocate);
        }
        int numAllocated = 0;
        int loopIndex = 2;
        if (enrichedAsk.getNodeMap().size() > 0) {
            loopIndex = 0;
        }
        while (numAllocated < toAllocate) {
            Collection<RemoteNode> nodeCandidates = this.findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes, enrichedAsk);
            for (RemoteNode rNode : nodeCandidates) {
                String rNodeHost = rNode.getNodeId().getHost();
                if (blacklist.contains(rNodeHost)) {
                    LOG.info("Nodes for scheduling has a blacklisted node [" + rNodeHost + "]..");
                    continue;
                }
                String location = "*";
                if (loopIndex == 0) {
                    if (!enrichedAsk.getNodeMap().containsKey(rNodeHost)) continue;
                    location = rNodeHost;
                } else if (allocatedNodes.contains(rNodeHost)) {
                    LOG.info("Opportunistic container has already been allocated on {}.", (Object)rNodeHost);
                    continue;
                }
                if (loopIndex == 1) {
                    if (!enrichedAsk.getRackMap().containsKey(rNode.getRackName())) continue;
                    location = rNode.getRackName();
                }
                Container container = this.createContainer(rmIdentifier, appParams, idCounter, id, userName, allocations, location, anyAsk, rNode);
                this.updateMetrics(loopIndex);
                allocatedNodes.add(rNodeHost);
                LOG.info("Allocated [" + container.getId() + "] as opportunistic at location [" + location + "]");
                if (++numAllocated < toAllocate) continue;
                break;
            }
            loopIndex = loopIndex == 0 && enrichedAsk.getRackMap().size() > 0 ? 1 : ++loopIndex;
            if (loopIndex <= 2 || numAllocated != 0) continue;
            LOG.warn("Unable to allocate any opportunistic containers.");
            break;
        }
    }

    private void updateMetrics(int loopIndex) {
        OpportunisticSchedulerMetrics metrics = OpportunisticSchedulerMetrics.getMetrics();
        if (loopIndex == 0) {
            metrics.incrNodeLocalOppContainers();
        } else if (loopIndex == 1) {
            metrics.incrRackLocalOppContainers();
        } else {
            metrics.incrOffSwitchOppContainers();
        }
    }

    private Collection<RemoteNode> findNodeCandidates(int loopIndex, Map<String, RemoteNode> allNodes, Set<String> blackList, Set<String> allocatedNodes, OpportunisticContainerAllocator.EnrichedResourceRequest enrichedRR) {
        LinkedList<RemoteNode> retList = new LinkedList<RemoteNode>();
        String partition = this.getRequestPartition(enrichedRR);
        if (loopIndex > 1) {
            for (RemoteNode remoteNode : allNodes.values()) {
                if (!StringUtils.equals((CharSequence)partition, (CharSequence)this.getRemoteNodePartition(remoteNode))) continue;
                retList.add(remoteNode);
            }
            return retList;
        }
        int numContainers = enrichedRR.getRequest().getNumContainers();
        while (numContainers > 0 && (numContainers = loopIndex == 0 ? this.collectNodeLocalCandidates(allNodes, enrichedRR, retList, numContainers) : this.collectRackLocalCandidates(allNodes, enrichedRR, retList, blackList, allocatedNodes, numContainers)) != enrichedRR.getRequest().getNumContainers()) {
        }
        return retList;
    }

    private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes, OpportunisticContainerAllocator.EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList, Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
        String partition = this.getRequestPartition(enrichedRR);
        for (RemoteNode rNode : allNodes.values()) {
            if (StringUtils.equals((CharSequence)partition, (CharSequence)this.getRemoteNodePartition(rNode)) && enrichedRR.getRackMap().containsKey(rNode.getRackName())) {
                String rHost = rNode.getNodeId().getHost();
                if (blackList.contains(rHost)) continue;
                if (allocatedNodes.contains(rHost)) {
                    retList.addLast(rNode);
                } else {
                    retList.addFirst(rNode);
                    --numContainers;
                }
            }
            if (numContainers != 0) continue;
            break;
        }
        return numContainers;
    }

    private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes, OpportunisticContainerAllocator.EnrichedResourceRequest enrichedRR, List<RemoteNode> retList, int numContainers) {
        String partition = this.getRequestPartition(enrichedRR);
        for (String nodeName : enrichedRR.getNodeMap().keySet()) {
            RemoteNode remoteNode = allNodes.get(nodeName);
            if (remoteNode != null && StringUtils.equals((CharSequence)partition, (CharSequence)this.getRemoteNodePartition(remoteNode))) {
                retList.add(remoteNode);
                --numContainers;
            }
            if (numContainers != 0) continue;
            break;
        }
        return numContainers;
    }
}

