/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.DefaultSubClusterResolverImpl;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestLocalityMulticastAMRMProxyPolicy
extends BaseFederationPoliciesTest {
    public static final Logger LOG = LoggerFactory.getLogger(TestLocalityMulticastAMRMProxyPolicy.class);

    @Before
    public void setUp() throws Exception {
        this.setPolicy((ConfigurableFederationPolicy)new TestableLocalityMulticastAMRMProxyPolicy());
        this.setPolicyInfo(new WeightedPolicyInfo());
        HashMap<SubClusterIdInfo, Float> routerWeights = new HashMap<SubClusterIdInfo, Float>();
        HashMap<SubClusterIdInfo, Float> amrmWeights = new HashMap<SubClusterIdInfo, Float>();
        for (int i = 0; i < 6; ++i) {
            SubClusterIdInfo sc = new SubClusterIdInfo("subcluster" + i);
            if (i != 3) {
                SubClusterInfo sci = SubClusterInfo.newInstance((SubClusterId)sc.toId(), (String)"dns1:80", (String)"dns1:81", (String)"dns1:82", (String)"dns1:83", (SubClusterState)SubClusterState.SC_RUNNING, (long)System.currentTimeMillis(), (String)"something");
                this.getActiveSubclusters().put(sc.toId(), sci);
            }
            float weight = 0.1f;
            routerWeights.put(sc, Float.valueOf(weight));
            amrmWeights.put(sc, Float.valueOf(weight));
            if (i != 4) continue;
            routerWeights.put(sc, Float.valueOf(0.0f));
            amrmWeights.put(sc, Float.valueOf(0.0f));
        }
        this.getPolicyInfo().setRouterPolicyWeights(routerWeights);
        this.getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
        this.getPolicyInfo().setHeadroomAlpha(0.5f);
        this.setHomeSubCluster(SubClusterId.newInstance((String)"homesubcluster"));
    }

    @Override
    @Test
    public void testReinitilialize() throws YarnException {
        this.initializePolicy();
    }

    private void initializePolicy() throws YarnException {
        this.initializePolicy((Configuration)new YarnConfiguration());
    }

    private void initializePolicy(Configuration conf) throws YarnException {
        this.setFederationPolicyContext(new FederationPolicyInitializationContext());
        SubClusterResolver resolver = FederationPoliciesTestUtil.initResolver();
        this.getFederationPolicyContext().setFederationSubclusterResolver(resolver);
        ByteBuffer buf = this.getPolicyInfo().toByteBuffer();
        this.getFederationPolicyContext().setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration.newInstance((String)"queue1", (String)this.getPolicy().getClass().getCanonicalName(), (ByteBuffer)buf));
        this.getFederationPolicyContext().setHomeSubcluster(this.getHomeSubCluster());
        FederationPoliciesTestUtil.initializePolicyContext(this.getFederationPolicyContext(), this.getPolicy(), this.getPolicyInfo(), this.getActiveSubclusters(), conf);
    }

    @Test(expected=FederationPolicyInitializationException.class)
    public void testNullWeights() throws Exception {
        this.getPolicyInfo().setAMRMPolicyWeights(null);
        this.initializePolicy();
        Assert.fail();
    }

    @Test(expected=FederationPolicyInitializationException.class)
    public void testEmptyWeights() throws Exception {
        this.getPolicyInfo().setAMRMPolicyWeights(new HashMap());
        this.initializePolicy();
        Assert.fail();
    }

    @Test
    public void testSplitBasedOnHeadroom() throws Exception {
        this.getPolicyInfo().setHeadroomAlpha(1.0f);
        this.initializePolicy();
        List<ResourceRequest> resourceRequests = this.createSimpleRequest();
        this.prepPolicyWithHeadroom(true);
        Map response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
        LOG.info("Initial headroom");
        this.prettyPrintRequests(response);
        this.validateSplit(response, resourceRequests);
        this.checkExpectedAllocation(response, "subcluster0", 1L, 60L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, -1L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 15L);
        this.checkExpectedAllocation(response, "subcluster5", 1L, 25L);
        this.checkTotalContainerAllocation(response, 100L);
        AllocateResponse ar = this.getAllocateResponseWithTargetHeadroom(40);
        ((FederationAMRMProxyPolicy)this.getPolicy()).notifyOfResponse(SubClusterId.newInstance((String)"subcluster2"), ar);
        response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
        LOG.info("After headroom update");
        this.prettyPrintRequests(response);
        this.validateSplit(response, resourceRequests);
        this.checkExpectedAllocation(response, "subcluster0", 1L, 37L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, -1L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 37L);
        this.checkExpectedAllocation(response, "subcluster5", 1L, 25L);
        this.checkTotalContainerAllocation(response, 100L);
    }

    @Test(timeout=5000L)
    public void testStressPolicy() throws Exception {
        this.getPolicyInfo().setHeadroomAlpha(1.0f);
        this.initializePolicy();
        this.addHomeSubClusterAsActive();
        int numRR = 1000;
        List<ResourceRequest> resourceRequests = this.createLargeRandomList(numRR);
        this.prepPolicyWithHeadroom(true);
        int numIterations = 1000;
        long tstart = System.currentTimeMillis();
        for (int i = 0; i < numIterations; ++i) {
            Map response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
            this.validateSplit(response, resourceRequests);
        }
        long tend = System.currentTimeMillis();
        LOG.info("Performed " + numIterations + " policy invocations (and validations) in " + (tend - tstart) + "ms");
    }

    @Test
    public void testFWDAllZeroANY() throws Exception {
        this.getPolicyInfo().setHeadroomAlpha(0.5f);
        this.initializePolicy();
        List<ResourceRequest> resourceRequests = this.createZeroSizedANYRequest();
        this.prepPolicyWithHeadroom(true);
        Map response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
        this.prettyPrintRequests(response);
        this.validateSplit(response, resourceRequests);
        this.checkExpectedAllocation(response, "subcluster0", 1L, 0L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, 0L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 0L);
        this.checkExpectedAllocation(response, "subcluster3", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster4", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster5", -1L, -1L);
        this.checkTotalContainerAllocation(response, 0L);
    }

    @Test
    public void testSplitBasedOnHeadroomAndWeights() throws Exception {
        this.getPolicyInfo().setHeadroomAlpha(0.5f);
        this.initializePolicy();
        List<ResourceRequest> resourceRequests = this.createSimpleRequest();
        this.prepPolicyWithHeadroom(true);
        Map response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
        this.prettyPrintRequests(response);
        this.validateSplit(response, resourceRequests);
        this.checkExpectedAllocation(response, "subcluster0", 1L, 42L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, 12L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 20L);
        this.checkExpectedAllocation(response, "subcluster3", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster4", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster5", 1L, 25L);
        this.checkTotalContainerAllocation(response, 100L);
    }

    private void prepPolicyWithHeadroom(boolean setSubCluster0) throws YarnException {
        AllocateResponse ar = this.getAllocateResponseWithTargetHeadroom(40);
        if (setSubCluster0) {
            ((FederationAMRMProxyPolicy)this.getPolicy()).notifyOfResponse(SubClusterId.newInstance((String)"subcluster0"), ar);
        }
        ar = this.getAllocateResponseWithTargetHeadroom(0);
        ((FederationAMRMProxyPolicy)this.getPolicy()).notifyOfResponse(SubClusterId.newInstance((String)"subcluster1"), ar);
        ar = this.getAllocateResponseWithTargetHeadroom(10);
        ((FederationAMRMProxyPolicy)this.getPolicy()).notifyOfResponse(SubClusterId.newInstance((String)"subcluster2"), ar);
    }

    private AllocateResponse getAllocateResponseWithTargetHeadroom(int numContainers) {
        return AllocateResponse.newInstance((int)0, null, null, Collections.emptyList(), (Resource)Resource.newInstance((int)(numContainers * 1024), (int)numContainers), null, (int)10, null, Collections.emptyList());
    }

    private void addHomeSubClusterAsActive() {
        SubClusterId homeSubCluster = this.getHomeSubCluster();
        SubClusterInfo sci = SubClusterInfo.newInstance((SubClusterId)homeSubCluster, (String)"dns1:80", (String)"dns1:81", (String)"dns1:82", (String)"dns1:83", (SubClusterState)SubClusterState.SC_RUNNING, (long)System.currentTimeMillis(), (String)"something");
        this.getActiveSubclusters().put(homeSubCluster, sci);
        SubClusterIdInfo sc = new SubClusterIdInfo(homeSubCluster.getId());
        this.getPolicyInfo().getRouterPolicyWeights().put(sc, Float.valueOf(0.1f));
        this.getPolicyInfo().getAMRMPolicyWeights().put(sc, Float.valueOf(0.1f));
    }

    @Test
    public void testSplitAllocateRequest() throws Exception {
        this.initializePolicy();
        this.addHomeSubClusterAsActive();
        FederationPoliciesTestUtil.initializePolicyContext(this.getFederationPolicyContext(), this.getPolicy(), this.getPolicyInfo(), this.getActiveSubclusters(), new Configuration());
        List<ResourceRequest> resourceRequests = this.createComplexRequest();
        Map response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
        this.validateSplit(response, resourceRequests);
        this.prettyPrintRequests(response);
        this.checkExpectedAllocation(response, this.getHomeSubCluster().getId(), 7L, 29L);
        this.checkExpectedAllocation(response, "subcluster0", 10L, 32L);
        this.checkExpectedAllocation(response, "subcluster1", 5L, 26L);
        this.checkExpectedAllocation(response, "subcluster2", 4L, 23L);
        this.checkExpectedAllocation(response, "subcluster3", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster4", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster5", 1L, 20L);
        this.checkTotalContainerAllocation(response, 130L);
        for (ResourceRequest rr : (List)response.get(this.getHomeSubCluster())) {
            Assert.assertTrue((rr.getAllocationRequestId() == 2L || rr.getAllocationRequestId() == 4L || rr.getAllocationRequestId() == 5L ? 1 : 0) != 0);
        }
        List rrs = (List)response.get(SubClusterId.newInstance((String)"subcluster0"));
        for (ResourceRequest rr : rrs) {
            Assert.assertTrue((rr.getAllocationRequestId() != 1L ? 1 : 0) != 0);
            Assert.assertTrue((rr.getAllocationRequestId() != 4L ? 1 : 0) != 0);
        }
        for (ResourceRequest rr : (List)response.get(SubClusterId.newInstance((String)"subcluster1"))) {
            Assert.assertTrue((rr.getAllocationRequestId() == 1L || rr.getAllocationRequestId() == 2L ? 1 : 0) != 0);
        }
        for (ResourceRequest rr : (List)response.get(SubClusterId.newInstance((String)"subcluster2"))) {
            Assert.assertTrue((rr.getAllocationRequestId() == 1L || rr.getAllocationRequestId() == 2L ? 1 : 0) != 0);
        }
        for (ResourceRequest rr : (List)response.get(SubClusterId.newInstance((String)"subcluster5"))) {
            Assert.assertTrue((rr.getAllocationRequestId() == 2L ? 1 : 0) != 0);
            Assert.assertTrue((boolean)rr.getRelaxLocality());
        }
    }

    private void checkExpectedAllocation(Map<SubClusterId, List<ResourceRequest>> response, String subCluster, long totResourceRequests, long minimumTotalContainers) {
        if (minimumTotalContainers == -1L) {
            Assert.assertNull(response.get(SubClusterId.newInstance((String)subCluster)));
        } else {
            SubClusterId sc = SubClusterId.newInstance((String)subCluster);
            Assert.assertEquals((long)totResourceRequests, (long)response.get(sc).size());
            long actualContCount = 0L;
            for (ResourceRequest rr : response.get(sc)) {
                actualContCount += (long)rr.getNumContainers();
            }
            Assert.assertTrue((String)("Actual count " + actualContCount + " should be at least " + minimumTotalContainers), (minimumTotalContainers <= actualContCount ? 1 : 0) != 0);
        }
    }

    private void checkTotalContainerAllocation(Map<SubClusterId, List<ResourceRequest>> response, long totalContainers) {
        long actualContCount = 0L;
        for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response.entrySet()) {
            for (ResourceRequest rr : entry.getValue()) {
                actualContCount += (long)rr.getNumContainers();
            }
        }
        Assert.assertEquals((long)totalContainers, (long)actualContCount);
    }

    private void validateSplit(Map<SubClusterId, List<ResourceRequest>> split, List<ResourceRequest> original) throws YarnException {
        SubClusterResolver resolver = this.getFederationPolicyContext().getFederationSubclusterResolver();
        int numUsedSubclusters = split.size();
        HashSet<Long> originalIds = new HashSet<Long>();
        HashSet<Long> splitIds = new HashSet<Long>();
        int originalContainers = 0;
        for (ResourceRequest rr : original) {
            originalContainers += rr.getNumContainers();
            originalIds.add(rr.getAllocationRequestId());
        }
        int splitContainers = 0;
        for (Map.Entry<SubClusterId, List<ResourceRequest>> rrs : split.entrySet()) {
            for (ResourceRequest rr : rrs.getValue()) {
                splitContainers += rr.getNumContainers();
                splitIds.add(rr.getAllocationRequestId());
                SubClusterId fid = null;
                try {
                    fid = resolver.getSubClusterForNode(rr.getResourceName());
                }
                catch (YarnException yarnException) {
                    // empty catch block
                }
                if (rrs.getKey().equals((Object)this.getHomeSubCluster()) || fid == null || fid.equals((Object)rrs.getKey())) continue;
                Assert.fail((String)"A node-local (or resolvable rack-local) RR should not be send to an RM other than what it resolves to.");
            }
        }
        Assert.assertEquals(originalIds, splitIds);
        Assert.assertTrue((String)(" Containers requested (" + splitContainers + ") should not exceed the original count of containers (" + originalContainers + ") by more than the number of subclusters (" + numUsedSubclusters + ")"), (originalContainers + numUsedSubclusters >= splitContainers ? 1 : 0) != 0);
        for (SubClusterId targetId : split.keySet()) {
            Assert.assertTrue((String)("Target subcluster " + targetId + " should be in the active set"), (boolean)this.getActiveSubclusters().containsKey(targetId));
            Assert.assertTrue((String)("Target subclusters (" + targetId + ") should have weight >0 in the policy "), (((Float)this.getPolicyInfo().getRouterPolicyWeights().get(new SubClusterIdInfo(targetId))).floatValue() > 0.0f ? 1 : 0) != 0);
        }
    }

    private void prettyPrintRequests(Map<SubClusterId, List<ResourceRequest>> response) {
        for (Map.Entry<SubClusterId, List<ResourceRequest>> entry : response.entrySet()) {
            String str = "";
            for (ResourceRequest rr : entry.getValue()) {
                str = str + " [id:" + rr.getAllocationRequestId() + " loc:" + rr.getResourceName() + " numCont:" + rr.getNumContainers() + "], ";
            }
            LOG.info(entry.getKey() + " --> " + str);
        }
    }

    private List<ResourceRequest> createLargeRandomList(int numRR) throws Exception {
        ArrayList<ResourceRequest> out = new ArrayList<ResourceRequest>();
        Random rand = new Random(1L);
        DefaultSubClusterResolverImpl resolver = (DefaultSubClusterResolverImpl)this.getFederationPolicyContext().getFederationSubclusterResolver();
        ArrayList nodes = new ArrayList(resolver.getNodeToSubCluster().keySet());
        for (int i = 0; i < numRR; ++i) {
            String nodeName = (String)nodes.get(rand.nextInt(nodes.size()));
            long allocationId = rand.nextInt(20);
            out.add(FederationPoliciesTestUtil.createResourceRequest(allocationId, nodeName, 1024, 1, 1, rand.nextInt(100), null, rand.nextBoolean()));
        }
        return out;
    }

    private List<ResourceRequest> createSimpleRequest() throws Exception {
        ArrayList<ResourceRequest> out = new ArrayList<ResourceRequest>();
        out.add(FederationPoliciesTestUtil.createResourceRequest(0L, "*", 1024, 1, 1, 100, null, true));
        return out;
    }

    private List<ResourceRequest> createZeroSizedANYRequest() throws Exception {
        ArrayList<ResourceRequest> out = new ArrayList<ResourceRequest>();
        out.add(FederationPoliciesTestUtil.createResourceRequest(0L, "*", 1024, 1, 1, 0, null, true));
        return out;
    }

    private List<ResourceRequest> createComplexRequest() throws Exception {
        ArrayList<ResourceRequest> out = new ArrayList<ResourceRequest>();
        out.add(FederationPoliciesTestUtil.createResourceRequest(0L, "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(0L, "subcluster0-rack0", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(0L, "*", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(1L, "subcluster1-rack1-host1", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(1L, "subcluster1-rack1-host2", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(1L, "subcluster2-rack3-host3", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(1L, "subcluster1-rack1", 1024, 1, 1, 2, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(1L, "subcluster2-rack3", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(1L, "*", 1024, 1, 1, 3, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(2L, "*", 1024, 1, 1, 100, null, true));
        out.add(FederationPoliciesTestUtil.createResourceRequest(3L, "subcluster0-rack0-host0", 1024, 1, 1, 1, null, true));
        out.add(FederationPoliciesTestUtil.createResourceRequest(3L, "subcluster0-rack0", 1024, 1, 1, 1, null, true));
        out.add(FederationPoliciesTestUtil.createResourceRequest(3L, "*", 1024, 1, 1, 1, null, true));
        out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownNode", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "unknownRack", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(4L, "*", 1024, 1, 1, 1, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "subcluster0-rack0-host0", 1024, 1, 1, 2, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "subcluster0-rack0", 1024, 1, 1, 2, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "node4", 1024, 1, 1, 2, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "rack2", 1024, 1, 1, 2, null, false));
        out.add(FederationPoliciesTestUtil.createResourceRequest(5L, "*", 1024, 1, 1, 4, null, false));
        return out;
    }

    public String printList(ArrayList<Integer> list) {
        StringBuilder sb = new StringBuilder();
        for (Integer entry : list) {
            sb.append(entry + ", ");
        }
        return sb.toString();
    }

    @Test
    public void testIntegerAssignment() throws YarnException {
        float[] weights = new float[]{0.0f, 0.1f, 0.2f, 0.2f, -0.1f, 0.1f, 0.2f, 0.1f, 0.1f};
        int[] expectedMin = new int[]{0, 1, 3, 3, 0, 1, 3, 1, 1};
        ArrayList<Float> weightsList = new ArrayList<Float>();
        for (float weight : weights) {
            weightsList.add(Float.valueOf(weight));
        }
        LocalityMulticastAMRMProxyPolicy policy = (LocalityMulticastAMRMProxyPolicy)this.getPolicy();
        for (int i = 0; i < 500000; ++i) {
            ArrayList allocations = policy.computeIntegerAssignment(19, weightsList);
            int sum = 0;
            for (int j = 0; j < weights.length; ++j) {
                sum += ((Integer)allocations.get(j)).intValue();
                if ((Integer)allocations.get(j) >= expectedMin[j]) continue;
                Assert.fail((String)(allocations.get(j) + " at index " + j + " should be at least " + expectedMin[j] + ". Allocation array: " + this.printList(allocations)));
            }
            Assert.assertEquals((String)("Expect sum to be 19 in array: " + this.printList(allocations)), (long)19L, (long)sum);
        }
    }

    @Test
    public void testCancelWithLocalizedResource() throws YarnException {
        this.getPolicyInfo().setHeadroomAlpha(1.0f);
        this.initializePolicy();
        ArrayList<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
        this.prepPolicyWithHeadroom(true);
        resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, "subcluster0-rack0-host0", 1024, 1, 1, 1, null, false));
        resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, "subcluster0-rack0", 1024, 1, 1, 1, null, false));
        resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, "*", 1024, 1, 1, 0, null, false));
        Map response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
        this.checkExpectedAllocation(response, "subcluster0", 3L, 1L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, 0L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 0L);
        this.checkExpectedAllocation(response, "subcluster3", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster4", -1L, -1L);
        this.checkExpectedAllocation(response, "subcluster5", -1L, -1L);
        resourceRequests.clear();
        resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, "subcluster0-rack0-host0", 1024, 1, 1, 0, null, false));
        resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, "subcluster0-rack0", 1024, 1, 1, 0, null, false));
        resourceRequests.add(FederationPoliciesTestUtil.createResourceRequest(0L, "*", 1024, 1, 1, 100, null, false));
        response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, new HashSet());
        this.checkExpectedAllocation(response, "subcluster0", 3L, 60L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, -1L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 15L);
        this.checkExpectedAllocation(response, "subcluster5", 1L, 25L);
        this.checkTotalContainerAllocation(response, 100L);
    }

    @Test
    public void testSubClusterExpiry() throws Exception {
        this.getPolicyInfo().setHeadroomAlpha(1.0f);
        YarnConfiguration conf = new YarnConfiguration();
        conf.setLong("yarn.federation.amrmproxy.subcluster.timeout.ms", 500L);
        this.initializePolicy((Configuration)conf);
        List<ResourceRequest> resourceRequests = this.createSimpleRequest();
        this.prepPolicyWithHeadroom(true);
        HashSet<SubClusterId> expiredSCList = new HashSet<SubClusterId>();
        Map response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, expiredSCList);
        this.prettyPrintRequests(response);
        this.validateSplit(response, resourceRequests);
        this.checkExpectedAllocation(response, "subcluster0", 1L, 60L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, -1L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 15L);
        this.checkExpectedAllocation(response, "subcluster5", 1L, 25L);
        this.checkTotalContainerAllocation(response, 100L);
        Thread.sleep(800L);
        expiredSCList.add(SubClusterId.newInstance((String)"subcluster0"));
        expiredSCList.add(SubClusterId.newInstance((String)"subcluster5"));
        response = ((FederationAMRMProxyPolicy)this.getPolicy()).splitResourceRequests(resourceRequests, expiredSCList);
        this.prettyPrintRequests(response);
        this.validateSplit(response, resourceRequests);
        this.checkExpectedAllocation(response, "subcluster0", 1L, -1L);
        this.checkExpectedAllocation(response, "subcluster1", 1L, -1L);
        this.checkExpectedAllocation(response, "subcluster2", 1L, 100L);
        this.checkExpectedAllocation(response, "subcluster5", 1L, -1L);
        this.checkTotalContainerAllocation(response, 100L);
    }

    @Test
    public void testLoadBasedSubClusterReroute() throws YarnException {
        int pendingThreshold = 1000;
        LocalityMulticastAMRMProxyPolicy policy = (LocalityMulticastAMRMProxyPolicy)this.getPolicy();
        this.initializePolicy();
        SubClusterId sc0 = SubClusterId.newInstance((String)"0");
        SubClusterId sc1 = SubClusterId.newInstance((String)"1");
        SubClusterId sc2 = SubClusterId.newInstance((String)"2");
        SubClusterId sc3 = SubClusterId.newInstance((String)"3");
        SubClusterId sc4 = SubClusterId.newInstance((String)"4");
        HashSet<SubClusterId> scList = new HashSet<SubClusterId>();
        scList.add(sc0);
        scList.add(sc1);
        scList.add(sc2);
        scList.add(sc3);
        scList.add(sc4);
        policy.notifyOfResponse(sc0, this.getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0));
        policy.notifyOfResponse(sc1, this.getAllocateResponseWithEnhancedHeadroom(4 * pendingThreshold, 0));
        policy.notifyOfResponse(sc2, this.getAllocateResponseWithEnhancedHeadroom(2 * pendingThreshold, 0));
        policy.notifyOfResponse(sc3, this.getAllocateResponseWithEnhancedHeadroom(pendingThreshold, 0));
        policy.notifyOfResponse(sc4, this.getAllocateResponseWithEnhancedHeadroom(0, 0));
        Assert.assertEquals((Object)policy.routeNodeRequestIfNeeded(sc2, pendingThreshold, scList), (Object)sc2);
        Assert.assertEquals((Object)policy.routeNodeRequestIfNeeded(sc3, pendingThreshold, scList), (Object)sc3);
        Assert.assertEquals((Object)policy.routeNodeRequestIfNeeded(sc4, pendingThreshold, scList), (Object)sc4);
        HashMap<SubClusterId, Integer> counts = new HashMap<SubClusterId, Integer>();
        counts.put(sc0, 0);
        counts.put(sc1, 0);
        counts.put(sc2, 0);
        counts.put(sc3, 0);
        counts.put(sc4, 0);
        int n = 100000;
        for (int i = 0; i < n; ++i) {
            SubClusterId selectedId = policy.routeNodeRequestIfNeeded(sc0, pendingThreshold, scList);
            counts.put(selectedId, (Integer)counts.get(selectedId) + 1);
            selectedId = policy.routeNodeRequestIfNeeded(sc1, pendingThreshold, scList);
            counts.put(selectedId, (Integer)counts.get(selectedId) + 1);
            selectedId = policy.routeNodeRequestIfNeeded(SubClusterId.newInstance((String)"10"), pendingThreshold, scList);
            counts.put(selectedId, (Integer)counts.get(selectedId) + 1);
        }
        Assert.assertEquals((double)((double)((Integer)counts.get(sc0)).intValue() / (double)n / 3.0), (double)0.0625, (double)0.01);
        Assert.assertEquals((double)((double)((Integer)counts.get(sc1)).intValue() / (double)n / 3.0), (double)0.0625, (double)0.01);
        Assert.assertEquals((double)((double)((Integer)counts.get(sc2)).intValue() / (double)n / 3.0), (double)0.125, (double)0.01);
        Assert.assertEquals((double)((double)((Integer)counts.get(sc3)).intValue() / (double)n / 3.0), (double)0.25, (double)0.01);
        Assert.assertEquals((double)((double)((Integer)counts.get(sc4)).intValue() / (double)n / 3.0), (double)0.5, (double)0.01);
        Assert.assertEquals((long)5L, (long)counts.size());
    }

    private AllocateResponse getAllocateResponseWithEnhancedHeadroom(int pending, int activeCores) {
        return AllocateResponse.newInstance((int)0, null, null, Collections.emptyList(), (Resource)Resource.newInstance((int)0, (int)0), null, (int)10, null, Collections.emptyList(), null, null, null, (EnhancedHeadroom)EnhancedHeadroom.newInstance((int)pending, (int)activeCores));
    }

    private class TestableLocalityMulticastAMRMProxyPolicy
    extends LocalityMulticastAMRMProxyPolicy {
        private TestableLocalityMulticastAMRMProxyPolicy() {
        }

        protected SubClusterId getSubClusterForUnResolvedRequest(LocalityMulticastAMRMProxyPolicy.AllocationBookkeeper bookkeeper, long allocationId) {
            SubClusterId originalResult = super.getSubClusterForUnResolvedRequest(bookkeeper, allocationId);
            Map activeClusters = null;
            try {
                activeClusters = this.getActiveSubclusters();
            }
            catch (YarnException e) {
                throw new RuntimeException(e);
            }
            Assert.assertTrue((boolean)activeClusters.containsKey(originalResult));
            return TestLocalityMulticastAMRMProxyPolicy.this.getHomeSubCluster();
        }
    }
}

