/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.io.File;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerWithMockPreemption;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestFSAppStarvation
extends FairSchedulerTestBase {
    private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
    private final ControlledClock clock = new ControlledClock();
    private static final int NODE_CAPACITY_MULTIPLE = 4;
    private static final String[] QUEUES = new String[]{"no-preemption", "minshare", "fairshare.child", "drf.child"};
    private FairSchedulerWithMockPreemption.MockPreemptionThread preemptionThread;

    @Before
    public void setup() {
        this.createConfiguration();
        this.conf.set("yarn.resourcemanager.scheduler.class", FairSchedulerWithMockPreemption.class.getCanonicalName());
        this.conf.set("yarn.scheduler.fair.allocation.file", ALLOC_FILE.getAbsolutePath());
        this.conf.setBoolean("yarn.scheduler.fair.preemption", true);
        this.conf.setFloat("yarn.scheduler.fair.preemption.cluster-utilization-threshold", 0.0f);
        this.conf.setLong("yarn.scheduler.fair.update-interval-ms", Long.MAX_VALUE);
    }

    @After
    public void teardown() {
        ALLOC_FILE.delete();
        this.conf = null;
        if (this.resourceManager != null) {
            this.resourceManager.stop();
            this.resourceManager = null;
        }
    }

    @Test
    public void testPreemptionDisabled() throws Exception {
        this.conf.setBoolean("yarn.scheduler.fair.preemption", false);
        this.setupClusterAndSubmitJobs();
        Assert.assertNull((String)"Found starved apps even when preemption is turned off", (Object)this.scheduler.getContext().getStarvedApps());
    }

    @Test
    public void testPreemptionEnabled() throws Exception {
        int i;
        this.setupClusterAndSubmitJobs();
        for (i = 0; i < 6000 && this.preemptionThread.uniqueAppsAdded() < 3; ++i) {
            Thread.sleep(10L);
        }
        Assert.assertNotNull((String)"FSContext does not have an FSStarvedApps instance", (Object)this.scheduler.getContext().getStarvedApps());
        Assert.assertEquals((String)"Expecting 3 starved applications, one each for the minshare and fairshare queues", (long)3L, (long)this.preemptionThread.uniqueAppsAdded());
        this.clock.tickSec(1);
        this.scheduler.update();
        Assert.assertEquals((String)"Apps re-added even before starvation delay passed", (long)this.preemptionThread.totalAppsAdded(), (long)this.preemptionThread.uniqueAppsAdded());
        this.verifyLeafQueueStarvation();
        this.clock.tickMsec(600000L);
        this.scheduler.update();
        for (i = 0; i < 6000 && this.preemptionThread.totalAppsAdded() < this.preemptionThread.uniqueAppsAdded() * 2; ++i) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)"Each app should be marked as starved once at each scheduler update above", (long)this.preemptionThread.totalAppsAdded(), (long)(this.preemptionThread.uniqueAppsAdded() * 2));
    }

    @Test
    public void testClusterUtilizationThreshold() throws Exception {
        this.conf.setFloat("yarn.scheduler.fair.preemption.cluster-utilization-threshold", 1.1f);
        this.setupClusterAndSubmitJobs();
        Assert.assertNotNull((String)"FSContext does not have an FSStarvedApps instance", (Object)this.scheduler.getContext().getStarvedApps());
        Assert.assertEquals((String)"Found starved apps when preemption threshold is over 100%", (long)0L, (long)this.preemptionThread.totalAppsAdded());
    }

    private void verifyLeafQueueStarvation() {
        for (String q : QUEUES) {
            if (q.equals("no-preemption")) continue;
            boolean isStarved = this.scheduler.getQueueManager().getLeafQueue(q, false).isStarved();
            Assert.assertTrue((boolean)isStarved);
        }
    }

    private void setupClusterAndSubmitJobs() throws Exception {
        this.setupStarvedCluster();
        this.submitAppsToEachLeafQueue();
        this.sendEnoughNodeUpdatesToAssignFully();
        this.clock.tickMsec(10L);
        this.scheduler.update();
    }

    private void setupStarvedCluster() {
        AllocationFileWriter.create().drfDefaultQueueSchedulingPolicy().addQueue(new AllocationFileQueue.Builder("default").build()).addQueue(new AllocationFileQueue.Builder("no-preemption").fairSharePreemptionThreshold(0.0).build()).addQueue(new AllocationFileQueue.Builder("minshare").fairSharePreemptionThreshold(0.0).minSharePreemptionTimeout(0).minResources("2048mb,2vcores").build()).addQueue(new AllocationFileQueue.Builder("fairshare").fairSharePreemptionThreshold(1.0).fairSharePreemptionTimeout(0).schedulingPolicy("fair").subQueue(new AllocationFileQueue.Builder("child").fairSharePreemptionThreshold(1.0).fairSharePreemptionTimeout(0).schedulingPolicy("fair").build()).build()).addQueue(new AllocationFileQueue.Builder("drf").fairSharePreemptionThreshold(1.0).fairSharePreemptionTimeout(0).schedulingPolicy("drf").subQueue(new AllocationFileQueue.Builder("child").fairSharePreemptionThreshold(1.0).fairSharePreemptionTimeout(0).schedulingPolicy("drf").build()).build()).writeToFile(ALLOC_FILE.getAbsolutePath());
        Assert.assertTrue((String)"Allocation file does not exist, not running the test", (boolean)ALLOC_FILE.exists());
        this.resourceManager = new MockRM(this.conf);
        this.scheduler = (FairScheduler)this.resourceManager.getResourceScheduler();
        this.scheduler.setClock((Clock)this.clock);
        this.resourceManager.start();
        this.preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)((Object)this.scheduler.preemptionThread);
        this.addNode(4096, 4);
        this.addNode(4096, 4);
        ApplicationAttemptId app = this.createSchedulingRequest(1024, 1, "root.default", "default", 8);
        this.scheduler.update();
        this.sendEnoughNodeUpdatesToAssignFully();
        Assert.assertEquals((long)8L, (long)this.scheduler.getSchedulerApp(app).getLiveContainers().size());
    }

    private void submitAppsToEachLeafQueue() {
        for (String queue : QUEUES) {
            this.createSchedulingRequest(1024, 1, "root." + queue, "user", 1);
        }
        this.scheduler.update();
    }

    private void sendEnoughNodeUpdatesToAssignFully() {
        for (RMNode node : this.rmNodes) {
            NodeUpdateSchedulerEvent nodeUpdateSchedulerEvent = new NodeUpdateSchedulerEvent(node);
            for (int i = 0; i < 4; ++i) {
                this.scheduler.handle((SchedulerEvent)nodeUpdateSchedulerEvent);
            }
        }
    }
}

