/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.TestResourceTrackerService;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TestSchedulerOvercommit {
    private static final Logger LOG = LoggerFactory.getLogger(TestSchedulerOvercommit.class);
    protected static final int GB = 1024;
    protected static final int INTERVAL = 200;
    private MockRM rm;
    private ResourceScheduler scheduler;
    private MockNM nm;
    private NodeId nmId;
    private RMAppAttempt attempt;
    private MockAM am;

    @Before
    public void setup() throws Exception {
        LOG.info("Setting up the test cluster...");
        Configuration conf = this.getConfiguration();
        this.rm = new MockRM(conf);
        this.rm.start();
        this.scheduler = this.rm.getResourceScheduler();
        this.nm = this.rm.registerNode("127.0.0.1:1234", 4096);
        this.nmId = this.nm.getNodeId();
        RMApp app = MockRMAppSubmitter.submit(this.rm, MockRMAppSubmissionData.Builder.createWithMemory(2048L, this.rm).build());
        this.nm.nodeHeartbeat(true);
        this.attempt = app.getCurrentAppAttempt();
        this.am = this.rm.sendAMLaunched(this.attempt.getAppAttemptId());
        this.am.registerAppAttempt();
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 2048L, 2048L);
        this.nm.nodeHeartbeat(true);
    }

    protected Configuration getConfiguration() {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setClass("yarn.node-attribute.fs-store.impl.class", TestResourceTrackerService.NullNodeAttributeStore.class, NodeAttributeStore.class);
        return conf;
    }

    @After
    public void cleanup() throws Exception {
        LOG.info("Cleaning up the test cluster...");
        if (this.am != null) {
            this.am.unregisterAppAttempt();
            this.am = null;
        }
        if (this.rm != null) {
            this.rm.drainEvents();
            this.rm.stop();
            this.rm = null;
        }
    }

    @Test
    public void testReduceNoTimeout() throws Exception {
        Container c1 = this.createContainer(this.am, 2048);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 4096L, 0L);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 2048, 2, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 4096, -2048, 200, 2000);
        this.nm.nodeHeartbeat(true);
        Assert.assertEquals((long)2048L, (long)this.nm.getCapability().getMemorySize());
        TestSchedulerOvercommit.assertNoPreemption(this.am.schedule().getPreemptionMessage());
        ContainerStatus containerStatus = BuilderUtils.newContainerStatus((ContainerId)c1.getId(), (ContainerState)ContainerState.COMPLETE, (String)"", (int)0, (Resource)c1.getResource());
        this.nm.containerStatus(containerStatus);
        LOG.info("Waiting for container to be finished for app...");
        GenericTestUtils.waitFor(() -> this.attempt.getJustFinishedContainers().size() == 1, (long)200L, (long)2000L);
        Assert.assertEquals((long)1L, (long)this.am.schedule().getCompletedContainersStatuses().size());
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 2048L, 0L);
        this.am.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 3072, 1, 1);
        AllocateResponse allocResponse2 = this.am.schedule();
        Assert.assertTrue((String)"Shouldn't have enough resource to allocate containers", (boolean)allocResponse2.getAllocatedContainers().isEmpty());
        for (int i = 0; i < 10; ++i) {
            Thread.sleep(200L);
            allocResponse2 = this.am.schedule();
            Assert.assertTrue((String)"Shouldn't have enough resource to allocate containers", (boolean)allocResponse2.getAllocatedContainers().isEmpty());
        }
    }

    @Test
    public void testChangeResourcesNoTimeout() throws Exception {
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 2048, 2048, 100, 2000);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 5120, 2, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 2048, 3072, 100, 2000);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 0, 2, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 2048, -2048, 100, 2000);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 4096, 2, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 2048, 2048, 100, 2000);
        Assert.assertEquals((Object)RMAppAttemptState.RUNNING, (Object)this.attempt.getState());
    }

    @Test
    public void testReduceKill() throws Exception {
        Container container = this.createContainer(this.am, 2048);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 4096L, 0L);
        long t0 = Time.now();
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 2048, 2, 0);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 2048, 0, 200, 400);
        List completedContainers = this.am.schedule().getCompletedContainersStatuses();
        Assert.assertEquals((long)1L, (long)completedContainers.size());
        ContainerStatus containerStatus = (ContainerStatus)completedContainers.get(0);
        TestSchedulerOvercommit.assertContainerKilled(container.getId(), containerStatus);
        TestSchedulerOvercommit.assertTime(0L, Time.now() - t0);
    }

    @Test
    public void testReducePreemptAndKill() throws Exception {
        Container container = this.createContainer(this.am, 2048);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 4096L, 0L);
        int timeout = (int)TimeUnit.SECONDS.toMillis(2L);
        long t0 = Time.now();
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 2048, 2, timeout);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 4096, -2048, 200, timeout);
        this.rm.drainEvents();
        PreemptionMessage preemptMsg = this.am.schedule().getPreemptionMessage();
        TestSchedulerOvercommit.assertPreemption(container.getId(), preemptMsg);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 2048, 0, 200, timeout + 400);
        List completedContainers = this.am.schedule().getCompletedContainersStatuses();
        Assert.assertEquals((long)1L, (long)completedContainers.size());
        ContainerStatus containerStatus = (ContainerStatus)completedContainers.get(0);
        TestSchedulerOvercommit.assertContainerKilled(container.getId(), containerStatus);
        TestSchedulerOvercommit.assertTime(timeout, Time.now() - t0);
    }

    @Test
    public void testReducePreemptAndCancel() throws Exception {
        Container container = this.createContainer(this.am, 2048);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 4096L, 0L);
        int timeout = (int)TimeUnit.SECONDS.toMillis(1L);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 2048, 2, timeout);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 4096, -2048, 200, timeout);
        this.rm.drainEvents();
        PreemptionMessage preemptMsg = this.am.schedule().getPreemptionMessage();
        TestSchedulerOvercommit.assertPreemption(container.getId(), preemptMsg);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 4096, 2, timeout);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 4096, 0, 200, timeout);
        long t0 = Time.now();
        while (Time.now() - t0 < TimeUnit.SECONDS.toMillis(2L)) {
            this.nm.nodeHeartbeat(true);
            AllocateResponse allocation = this.am.schedule();
            TestSchedulerOvercommit.assertNoPreemption(allocation.getPreemptionMessage());
            Assert.assertTrue((boolean)allocation.getCompletedContainersStatuses().isEmpty());
            Thread.sleep(200L);
        }
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 4096L, 0L);
        Assert.assertEquals((long)2L, (long)this.scheduler.getNodeReport(this.nmId).getNumContainers());
    }

    @Test
    public void testKillMultipleContainers() throws Exception {
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 8192, 6, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 2048, 6144, 200, 5000);
        Container c1 = this.createContainer(this.am, 1024);
        Container c2 = this.createContainer(this.am, 1024);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 4096, 4096, 200, 5000);
        RMApp app2 = MockRMAppSubmitter.submit(this.rm, MockRMAppSubmissionData.Builder.createWithMemory(2048L, this.rm).withAppName("app2").withUser("user2").build());
        this.nm.nodeHeartbeat(true);
        RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
        MockAM am2 = this.rm.sendAMLaunched(attempt2.getAppAttemptId());
        am2.registerAppAttempt();
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 6144, 2048, 200, 5000);
        Assert.assertEquals((Object)RMAppAttemptState.RUNNING, (Object)attempt2.getState());
        Container c3 = this.createContainer(am2, 2048);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 8192, 0, 200, 5000);
        Assert.assertEquals((long)5L, (long)this.scheduler.getNodeReport(this.nmId).getNumContainers());
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 5120, 6, 0);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 5120, 0, 200, 5000);
        Assert.assertEquals((long)3L, (long)this.scheduler.getNodeReport(this.nmId).getNumContainers());
        List completedContainers = am2.schedule().getCompletedContainersStatuses();
        Assert.assertEquals((long)1L, (long)completedContainers.size());
        ContainerStatus container3Status = (ContainerStatus)completedContainers.get(0);
        TestSchedulerOvercommit.assertContainerKilled(c3.getId(), container3Status);
        completedContainers = this.am.schedule().getCompletedContainersStatuses();
        Assert.assertEquals((long)1L, (long)completedContainers.size());
        ContainerStatus container2Status = (ContainerStatus)completedContainers.get(0);
        TestSchedulerOvercommit.assertContainerKilled(c2.getId(), container2Status);
        Assert.assertEquals((Object)RMAppAttemptState.RUNNING, (Object)this.attempt.getState());
        Assert.assertEquals((Object)RMAppAttemptState.RUNNING, (Object)attempt2.getState());
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 4096, 6, 0);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 4096, 0, 200, 5000);
        Assert.assertEquals((long)2L, (long)this.scheduler.getNodeReport(this.nmId).getNumContainers());
        completedContainers = this.am.schedule().getCompletedContainersStatuses();
        Assert.assertEquals((long)1L, (long)completedContainers.size());
        ContainerStatus container1Status = (ContainerStatus)completedContainers.get(0);
        TestSchedulerOvercommit.assertContainerKilled(c1.getId(), container1Status);
        Assert.assertEquals((Object)RMAppAttemptState.RUNNING, (Object)this.attempt.getState());
        Assert.assertEquals((Object)RMAppAttemptState.RUNNING, (Object)attempt2.getState());
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 2048, 6, 0);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nm, 2048, 0, 200, 5000);
        Assert.assertEquals((long)1L, (long)this.scheduler.getNodeReport(this.nmId).getNumContainers());
        Assert.assertEquals((Object)RMAppAttemptState.FAILED, (Object)attempt2.getState());
        Assert.assertEquals((Object)RMAppAttemptState.RUNNING, (Object)this.attempt.getState());
    }

    @Test
    public void testEndToEnd() throws Exception {
        Container c1 = this.createContainer(this.am, 2048);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 4096L, 0L);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 4096L, 0L);
        this.nm.nodeHeartbeat(true);
        Assert.assertEquals((long)4096L, (long)this.nm.getCapability().getMemorySize());
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 2048, 2, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 4096, -2048, 200, 5000);
        TestSchedulerOvercommit.assertNoPreemption(this.am.schedule().getPreemptionMessage());
        this.nm.nodeHeartbeat(true);
        Assert.assertEquals((long)2048L, (long)this.nm.getCapability().getMemorySize());
        ContainerStatus containerStatus = BuilderUtils.newContainerStatus((ContainerId)c1.getId(), (ContainerState)ContainerState.COMPLETE, (String)"", (int)0, (Resource)c1.getResource());
        this.nm.containerStatus(containerStatus);
        LOG.info("Waiting for containers to be finished for app 1...");
        GenericTestUtils.waitFor(() -> this.attempt.getJustFinishedContainers().size() == 1, (long)100L, (long)2000L);
        Assert.assertEquals((long)1L, (long)this.am.schedule().getCompletedContainersStatuses().size());
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 2048L, 0L);
        this.am.addRequests(new String[]{"127.0.0.1", "127.0.0.2"}, 3072, 1, 1);
        AllocateResponse allocResponse2 = this.am.schedule();
        Assert.assertTrue((String)"Shouldn't have enough resource to allocate containers", (boolean)allocResponse2.getAllocatedContainers().isEmpty());
        for (int i = 0; i < 10; ++i) {
            Thread.sleep(100L);
            allocResponse2 = this.am.schedule();
            Assert.assertTrue((String)"Shouldn't have enough resource to allocate containers", (boolean)allocResponse2.getAllocatedContainers().isEmpty());
        }
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 5120, 2, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 2048, 3072, 100, 5000);
        this.nm.nodeHeartbeat(true);
        while (allocResponse2.getAllocatedContainers().isEmpty()) {
            LOG.info("Waiting for containers to be created for app 1...");
            Thread.sleep(100L);
            allocResponse2 = this.am.schedule();
        }
        Assert.assertEquals((long)1L, (long)allocResponse2.getAllocatedContainers().size());
        Container c2 = (Container)allocResponse2.getAllocatedContainers().get(0);
        Assert.assertEquals((long)3072L, (long)c2.getResource().getMemorySize());
        Assert.assertEquals((Object)this.nmId, (Object)c2.getNodeId());
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 5120L, 0L);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 3072, 2, 2000);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 5120, -2048, 200, 5000);
        this.rm.drainEvents();
        PreemptionMessage preemptMsg = this.am.schedule().getPreemptionMessage();
        TestSchedulerOvercommit.assertPreemption(c2.getId(), preemptMsg);
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 5120, 2, -1);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 5120, 0, 200, 5000);
        Thread.sleep(3000L);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 5120L, 0L);
        long t0 = Time.now();
        TestSchedulerOvercommit.updateNodeResource(this.rm, this.nmId, 3072, 2, 2000);
        TestSchedulerOvercommit.waitMemory(this.scheduler, this.nmId, 5120, -2048, 200, 5000);
        this.rm.drainEvents();
        preemptMsg = this.am.schedule().getPreemptionMessage();
        TestSchedulerOvercommit.assertPreemption(c2.getId(), preemptMsg);
        GenericTestUtils.waitFor(() -> {
            try {
                this.nm.nodeHeartbeat(true);
            }
            catch (Exception e) {
                LOG.error("Cannot heartbeat", (Throwable)e);
            }
            SchedulerNodeReport report = this.scheduler.getNodeReport(this.nmId);
            return report.getAvailableResource().getMemorySize() > 0L;
        }, (long)200L, (long)5000L);
        TestSchedulerOvercommit.assertMemory(this.scheduler, this.nmId, 2048L, 1024L);
        List completedContainers = this.am.schedule().getCompletedContainersStatuses();
        Assert.assertEquals((long)1L, (long)completedContainers.size());
        ContainerStatus c2status = (ContainerStatus)completedContainers.get(0);
        TestSchedulerOvercommit.assertContainerKilled(c2.getId(), c2status);
        TestSchedulerOvercommit.assertTime(2000L, Time.now() - t0);
    }

    protected Container createContainer(MockAM app, int memory) throws Exception {
        ResourceRequest req = ResourceRequest.newBuilder().capability(Resource.newInstance((int)memory, (int)1)).numContainers(1).build();
        AllocateResponse response = app.allocate(Collections.singletonList(req), Collections.emptyList());
        List allocated = response.getAllocatedContainers();
        this.nm.nodeHeartbeat(true);
        for (int i = 0; allocated.isEmpty() && i < 10; ++i) {
            LOG.info("Waiting for containers to be created for app...");
            Thread.sleep(200L);
            response = app.schedule();
            allocated = response.getAllocatedContainers();
            this.nm.nodeHeartbeat(true);
        }
        Assert.assertFalse((String)"Cannot create the container", (boolean)allocated.isEmpty());
        Assert.assertEquals((long)1L, (long)allocated.size());
        Container c = (Container)allocated.get(0);
        Assert.assertEquals((long)memory, (long)c.getResource().getMemorySize());
        Assert.assertEquals((Object)this.nmId, (Object)c.getNodeId());
        return c;
    }

    public static void updateNodeResource(MockRM rm, NodeId nmId, int memory, int vCores, int overcommitTimeout) throws Exception {
        AdminService admin = rm.getAdminService();
        ResourceOption resourceOption = ResourceOption.newInstance((Resource)Resource.newInstance((int)memory, (int)vCores), (int)overcommitTimeout);
        UpdateNodeResourceRequest req = UpdateNodeResourceRequest.newInstance(Collections.singletonMap(nmId, resourceOption));
        admin.updateNodeResource(req);
    }

    public static void assertContainerKilled(ContainerId containerId, ContainerStatus status) {
        Assert.assertEquals((Object)containerId, (Object)status.getContainerId());
        Assert.assertEquals((Object)ContainerState.COMPLETE, (Object)status.getState());
        Assert.assertEquals((long)-102L, (long)status.getExitStatus());
        Assert.assertEquals((Object)"Container preempted by scheduler", (Object)status.getDiagnostics());
    }

    public static void assertTime(long expectedTime, long time) {
        Assert.assertTrue((String)("Too short: " + time + "ms"), (time > expectedTime ? 1 : 0) != 0);
        Assert.assertTrue((String)("Too long: " + time + "ms"), (time < expectedTime + 400L ? 1 : 0) != 0);
    }

    public static void assertNoPreemption(PreemptionMessage msg) {
        if (msg != null && msg.getContract() != null && !msg.getContract().getContainers().isEmpty()) {
            Assert.fail((String)("We shouldn't preempt containers: " + msg));
        }
    }

    public static void assertPreemption(ContainerId containerId, PreemptionMessage msg) {
        Assert.assertNotNull((String)"Expected a preemption message", (Object)msg);
        HashSet<ContainerId> preemptContainers = new HashSet<ContainerId>();
        if (msg.getContract() != null) {
            for (PreemptionContainer c : msg.getContract().getContainers()) {
                preemptContainers.add(c.getId());
            }
        }
        if (msg.getStrictContract() != null) {
            for (PreemptionContainer c : msg.getStrictContract().getContainers()) {
                preemptContainers.add(c.getId());
            }
        }
        Assert.assertEquals(Collections.singleton(containerId), preemptContainers);
    }

    public static void assertMemory(ResourceScheduler scheduler, NodeId nmId, long expectedUsed, long expectedAvailable) {
        SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
        Assert.assertNotNull((Object)nmReport);
        Resource used = nmReport.getUsedResource();
        Assert.assertEquals((String)"Used memory", (long)expectedUsed, (long)used.getMemorySize());
        Resource available = nmReport.getAvailableResource();
        Assert.assertEquals((String)"Available memory", (long)expectedAvailable, (long)available.getMemorySize());
    }

    public static void waitMemory(ResourceScheduler scheduler, NodeId nmId, int expectedUsed, int expectedAvailable, int checkEveryMillis, int waitForMillis) throws Exception {
        TestSchedulerOvercommit.waitMemory(scheduler, nmId, null, expectedUsed, expectedAvailable, checkEveryMillis, waitForMillis);
    }

    public static void waitMemory(ResourceScheduler scheduler, MockNM nm, int expectedUsed, int expectedAvailable, int checkEveryMillis, int waitForMillis) throws Exception {
        TestSchedulerOvercommit.waitMemory(scheduler, nm.getNodeId(), nm, expectedUsed, expectedAvailable, checkEveryMillis, waitForMillis);
    }

    public static void waitMemory(ResourceScheduler scheduler, NodeId nmId, MockNM nm, int expectedUsed, int expectedAvailable, int checkEveryMillis, int waitForMillis) throws Exception {
        long start = Time.monotonicNow();
        while (Time.monotonicNow() - start < (long)waitForMillis) {
            try {
                if (nm != null) {
                    nm.nodeHeartbeat(true);
                }
                TestSchedulerOvercommit.assertMemory(scheduler, nmId, expectedUsed, expectedAvailable);
                return;
            }
            catch (AssertionError e) {
                Thread.sleep(checkEveryMillis);
            }
        }
        SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
        Resource used = nmReport.getUsedResource();
        Resource available = nmReport.getAvailableResource();
        throw new TimeoutException("Took longer than " + waitForMillis + "ms to get to " + expectedUsed + "," + expectedAvailable + " actual=" + used + "," + available);
    }
}

