/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.labelmanagement.LabelManagementService;
import org.apache.hadoop.yarn.server.resourcemanager.labelmanagement.LabelManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestFairSchedulerPreemption
extends FairSchedulerTestBase {
    private static final String ALLOC_FILE = new File(TEST_DIR, TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
    private static final String LABEL_FILE = TEST_DIR + "/labelFile";
    private static final String STATIC_HOST = "127.0.0.1";
    private ControlledClock clock;

    @Override
    public Configuration createConfiguration() {
        Configuration conf = super.createConfiguration();
        conf.setClass("yarn.resourcemanager.scheduler.class", StubbedFairScheduler.class, ResourceScheduler.class);
        conf.setBoolean("yarn.scheduler.fair.preemption", true);
        conf.set("yarn.scheduler.fair.allocation.file", ALLOC_FILE);
        return conf;
    }

    @Before
    public void setup() throws IOException {
        this.conf = this.createConfiguration();
        this.clock = new ControlledClock();
    }

    @After
    public void teardown() {
        if (this.resourceManager != null) {
            this.resourceManager.stop();
            this.resourceManager = null;
        }
        this.conf = null;
    }

    private void startResourceManager(float utilizationThreshold) {
        this.conf.setFloat("yarn.scheduler.fair.preemption.cluster-utilization-threshold", utilizationThreshold);
        this.resourceManager = new MockRM(this.conf);
        this.resourceManager.start();
        Assert.assertTrue((boolean)(this.resourceManager.getResourceScheduler() instanceof StubbedFairScheduler));
        this.scheduler = (FairScheduler)this.resourceManager.getResourceScheduler();
        this.scheduler.setClock((Clock)this.clock);
        this.scheduler.updateInterval = 60000L;
    }

    private void registerNodeAndSubmitApp(int memory, int vcores, double disks, int appContainers, int appMemory) {
        RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource((int)memory, (int)vcores, (double)disks), 1, "node1");
        NetUtils.addStaticResolution((String)node1.getHostName(), (String)STATIC_HOST);
        NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
        this.scheduler.handle((SchedulerEvent)nodeEvent1);
        Assert.assertEquals((String)"Incorrect amount of resources in the cluster", (long)memory, (long)this.scheduler.rootMetrics.getAvailableMB());
        Assert.assertEquals((String)"Incorrect amount of resources in the cluster", (long)vcores, (long)this.scheduler.rootMetrics.getAvailableVirtualCores());
        Assert.assertEquals((String)"Incorrect amount of resources in the cluster", (double)disks, (double)this.scheduler.rootMetrics.getAvailableDisks(), (double)0.001);
        this.createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
        this.scheduler.update();
        for (int i = 0; i < 3; ++i) {
            NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
            this.scheduler.handle((SchedulerEvent)nodeUpdate1);
        }
        Assert.assertEquals((String)"app1's request is not met", (long)(memory - appContainers * appMemory), (long)this.scheduler.rootMetrics.getAvailableMB());
    }

    @Test
    public void testPreemptionWithFreeResources() throws Exception {
        PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
        out.println("<?xml version=\"1.0\"?>");
        out.println("<allocations>");
        out.println("<queue name=\"default\">");
        out.println("<maxResources>0mb,0vcores,0disks</maxResources>");
        out.println("</queue>");
        out.println("<queue name=\"queueA\">");
        out.println("<weight>1</weight>");
        out.println("<minResources>1024mb,0vcores,0disks</minResources>");
        out.println("</queue>");
        out.println("<queue name=\"queueB\">");
        out.println("<weight>1</weight>");
        out.println("<minResources>1024mb,0vcores,0disks</minResources>");
        out.println("</queue>");
        out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
        out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
        out.println("</allocations>");
        out.close();
        this.startResourceManager(0.0f);
        this.registerNodeAndSubmitApp(4096, 4, 1.0, 2, 1024);
        this.createSchedulingRequest(1024, "queueB", "user1", 1, 1);
        this.scheduler.update();
        this.clock.tickSec(6);
        ((StubbedFairScheduler)this.scheduler).resetLastPreemptResources();
        this.scheduler.preemptTasksIfNecessary();
        Assert.assertEquals((String)"preemptResources() should have been called", (long)1024L, (long)((StubbedFairScheduler)this.scheduler).lastPreemptMemory);
        this.resourceManager.stop();
        this.startResourceManager(0.8f);
        this.registerNodeAndSubmitApp(4096, 4, 1.0, 3, 1024);
        this.createSchedulingRequest(1024, "queueB", "user1", 1, 1);
        this.scheduler.update();
        this.clock.tickSec(6);
        ((StubbedFairScheduler)this.scheduler).resetLastPreemptResources();
        this.scheduler.preemptTasksIfNecessary();
        Assert.assertEquals((String)"preemptResources() should not have been called", (long)-1L, (long)((StubbedFairScheduler)this.scheduler).lastPreemptMemory);
        this.resourceManager.stop();
        this.startResourceManager(0.7f);
        this.registerNodeAndSubmitApp(4096, 4, 1.0, 3, 1024);
        this.createSchedulingRequest(1024, "queueB", "user1", 1, 1);
        this.scheduler.update();
        this.clock.tickSec(6);
        ((StubbedFairScheduler)this.scheduler).resetLastPreemptResources();
        this.scheduler.preemptTasksIfNecessary();
        Assert.assertEquals((String)"preemptResources() should have been called", (long)1024L, (long)((StubbedFairScheduler)this.scheduler).lastPreemptMemory);
    }

    @Test
    public void testPreemptionThresholdWithLbs() throws Exception {
        NodeUpdateSchedulerEvent nodeUpdate1;
        int i;
        this.conf.set("node.labels.file", LABEL_FILE);
        PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
        this.conf.setBoolean("yarn.scheduler.fair.preemption.cluster-utilization-threshold.based-on-labels-enabled", true);
        out.println("<?xml version=\"1.0\"?>");
        out.println("<allocations>");
        out.println("<queue name=\"default\">");
        out.println("  <maxResources>0mb,0vcores,0disks</maxResources>");
        out.println("</queue>");
        out.println("<queue name=\"plain1\">");
        out.println("  <weight>1</weight>");
        out.println("  <label>Plain</label>");
        out.println("  <minResources>1024mb,0vcores,0disks</minResources>");
        out.println("</queue>");
        out.println("<queue name=\"plain2\">");
        out.println("  <weight>1</weight>");
        out.println("  <label>Plain</label>");
        out.println("  <minResources>1024mb,0vcores,0disks</minResources>");
        out.println("</queue>");
        out.println("<queue name=\"large\">");
        out.println("  <weight>1</weight>");
        out.println("  <label>Large</label>");
        out.println("  <minResources>1024mb,0vcores,0disks</minResources>");
        out.println("</queue>");
        out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
        out.println("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
        out.println("</allocations>");
        out.close();
        out = new PrintWriter(new FileWriter(LABEL_FILE));
        out.println("node1  Plain");
        out.println("node2  Large");
        out.close();
        this.lbS = new LabelManagementService();
        this.lbS.init(this.conf);
        this.lbS.start();
        LabelManager lb = LabelManager.getInstance();
        lb.refreshLabels(this.conf);
        this.startResourceManager(0.7f);
        RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource((int)4096, (int)4, (double)1.0), 1, "node1");
        NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
        NetUtils.addStaticResolution((String)node1.getHostName(), (String)STATIC_HOST);
        this.scheduler.handle((SchedulerEvent)nodeEvent1);
        this.createSchedulingRequest(1024, "plain1", "user1", 3);
        this.scheduler.update();
        for (i = 0; i < 3; ++i) {
            nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
            this.scheduler.handle((SchedulerEvent)nodeUpdate1);
        }
        this.createSchedulingRequest(1024, "plain2", "user1", 1, 1);
        this.scheduler.update();
        this.clock.tickSec(6);
        ((StubbedFairScheduler)this.scheduler).resetLastPreemptResources();
        this.scheduler.preemptTasksIfNecessary();
        Assert.assertEquals((String)"preemptResources() should have been called", (long)1024L, (long)((StubbedFairScheduler)this.scheduler).lastPreemptMemory);
        this.resourceManager.stop();
        this.startResourceManager(0.8f);
        node1 = MockNodes.newNodeInfo(1, Resources.createResource((int)4096, (int)4, (double)1.0), 1, "node1");
        nodeEvent1 = new NodeAddedSchedulerEvent(node1);
        NetUtils.addStaticResolution((String)node1.getHostName(), (String)STATIC_HOST);
        this.scheduler.handle((SchedulerEvent)nodeEvent1);
        this.createSchedulingRequest(1024, "plain1", "user1", 3);
        this.scheduler.update();
        for (i = 0; i < 3; ++i) {
            nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
            this.scheduler.handle((SchedulerEvent)nodeUpdate1);
        }
        this.createSchedulingRequest(1024, "plain2", "user1", 1, 1);
        this.scheduler.update();
        this.clock.tickSec(6);
        ((StubbedFairScheduler)this.scheduler).resetLastPreemptResources();
        this.scheduler.preemptTasksIfNecessary();
        Assert.assertEquals((String)"preemptResources() should not have been called", (long)-1L, (long)((StubbedFairScheduler)this.scheduler).lastPreemptMemory);
        RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource((int)10240, (int)10, (double)1.0), 1, "node2");
        NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
        NetUtils.addStaticResolution((String)node2.getHostName(), (String)STATIC_HOST);
        this.scheduler.handle((SchedulerEvent)nodeEvent2);
        this.createSchedulingRequest(1024, "large", "user1", 10);
        this.scheduler.update();
        for (int i2 = 0; i2 < 10; ++i2) {
            NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
            this.scheduler.handle((SchedulerEvent)nodeUpdate2);
        }
        ((StubbedFairScheduler)this.scheduler).resetLastPreemptResources();
        this.scheduler.preemptTasksIfNecessary();
        Assert.assertEquals((String)"preemptResources() should not have been called", (long)-1L, (long)((StubbedFairScheduler)this.scheduler).lastPreemptMemory);
        this.lbS.stop();
        LocalFileSystem fs = FileSystem.getLocal((Configuration)this.conf);
        fs.delete(new Path(LABEL_FILE), false);
        lb.refreshLabels(this.conf);
    }

    private static class StubbedFairScheduler
    extends FairScheduler {
        public int lastPreemptMemory = -1;

        private StubbedFairScheduler() {
        }

        protected void preemptResources(Map<FSAppAttempt, Resource> toPreempt) {
            Resource totalResource = Resources.createResource((int)0);
            for (Resource resource : toPreempt.values()) {
                totalResource = Resources.add((Resource)totalResource, (Resource)resource);
            }
            this.lastPreemptMemory = totalResource.getMemory();
        }

        public void resetLastPreemptResources() {
            this.lastPreemptMemory = -1;
        }
    }
}

