/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSorter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestCapacitySchedulerMultiNodesWithPreemption {
    private static final Logger LOG = LoggerFactory.getLogger(TestCapacitySchedulerMultiNodesWithPreemption.class);
    private CapacitySchedulerConfiguration conf;
    private static final String POLICY_CLASS_NAME = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy";

    @Before
    public void setUp() {
        CapacitySchedulerConfiguration config = new CapacitySchedulerConfiguration();
        config.set("yarn.scheduler.capacity.resource-calculator", DominantResourceCalculator.class.getName());
        this.conf = new CapacitySchedulerConfiguration((Configuration)config);
        this.conf.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        this.conf.set("yarn.scheduler.capacity.multi-node-sorting.policy.names", "resource-based");
        this.conf.set("yarn.scheduler.capacity.multi-node-sorting.policy", "resource-based");
        String policyName = "yarn.scheduler.capacity.multi-node-sorting.policy.resource-based.class";
        this.conf.set(policyName, POLICY_CLASS_NAME);
        this.conf.setBoolean("yarn.scheduler.capacity.multi-node-placement-enabled", true);
        this.conf.set("yarn.scheduler.capacity.maximum-am-resource-percent", "1");
        this.conf.setInt("yarn.scheduler.minimum-allocation-mb", 512);
        this.conf.setInt("yarn.scheduler.minimum-allocation-vcores", 1);
        this.conf.setInt("yarn.scheduler.maximum-allocation-mb", 102400);
        this.conf.set("yarn.scheduler.capacity.root.queues", "A, default");
        this.conf.set("yarn.scheduler.capacity.root.A.capacity", "50");
        this.conf.set("yarn.scheduler.capacity.root.default.capacity", "50");
        this.conf.set("yarn.scheduler.capacity.root.A.maximum-capacity", "100");
        this.conf.set("yarn.scheduler.capacity.root.default.maximum-capacity", "100");
        this.conf.set("yarn.scheduler.capacity.root.A.user-limit-factor", "10");
        this.conf.set("yarn.scheduler.capacity.root.default.user-limit-factor", "10");
        this.conf.setLong("yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill", 10000L);
        this.conf.setLong("yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval", 1500L);
        this.conf.setFloat("yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round", 1.0f);
        this.conf.setFloat("yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor", 1.0f);
        this.conf.set("yarn.resourcemanager.scheduler.monitor.policies", ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
        this.conf.setBoolean("yarn.resourcemanager.scheduler.monitor.enable", true);
        this.conf.setLong("yarn.resourcemanager.nodemanagers.heartbeat-interval-ms", 60000L);
    }

    @Test(timeout=60000L)
    public void testAllocateReservationFromOtherNode() throws Exception {
        MockNM nm3;
        MockNM nm2;
        final MockRM rm = new MockRM((Configuration)this.conf);
        rm.start();
        final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 1024, 2);
        MockNM[] nms = new MockNM[]{nm1, nm2 = rm.registerNode("127.0.0.2:1234", 2048, 2), nm3 = rm.registerNode("127.0.0.3:1234", 3072, 2)};
        MultiNodeSortingManager mns = rm.getRMContext().getMultiNodeSortingManager();
        MultiNodeSorter sorter = mns.getMultiNodePolicy(POLICY_CLASS_NAME);
        sorter.reSortClusterNodes();
        RMApp app1 = MockRMAppSubmitter.submit(rm, MockRMAppSubmissionData.Builder.createWithMemory(3072L, rm).withAppName("app-1").withUser("user1").withAcls(null).withQueue("default").build());
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
        am1.allocateAndWaitForContainers("*", 1, 2048, nm2);
        am1.allocateAndWaitForContainers("*", 1, 1024, nm3);
        GenericTestUtils.waitFor(() -> {
            SchedulerNodeReport reportNM1 = rm.getResourceScheduler().getNodeReport(nms[0].getNodeId());
            SchedulerNodeReport reportNM2 = rm.getResourceScheduler().getNodeReport(nms[1].getNodeId());
            return reportNM1.getAvailableResource().getMemorySize() == 0L && reportNM2.getAvailableResource().getMemorySize() == 0L;
        }, (long)10L, (long)10000L);
        final AtomicBoolean result = new AtomicBoolean(false);
        final RMApp app2 = MockRMAppSubmitter.submit(rm, MockRMAppSubmissionData.Builder.createWithMemory(1024L, rm).withAppName("app-2").withUser("user2").withAcls(null).withQueue("A").build());
        Thread t1 = new Thread(){

            @Override
            public void run() {
                try {
                    MockAM am2 = MockRM.launchAM(app2, rm, nm1);
                    result.set(true);
                }
                catch (Exception e) {
                    Assert.fail((String)"Failed to launch app-2");
                }
            }
        };
        t1.start();
        AtomicReference preemptedNode = new AtomicReference();
        GenericTestUtils.waitFor(() -> {
            for (int i = 0; i < nms.length; ++i) {
                SchedulerNodeReport reportNM = rm.getResourceScheduler().getNodeReport(nms[i].getNodeId());
                if (reportNM.getAvailableResource().getMemorySize() != 1024L) continue;
                preemptedNode.set(nms[i]);
                return true;
            }
            return false;
        }, (long)10L, (long)30000L);
        LOG.info("Preempted node is: " + ((MockNM)preemptedNode.get()).getNodeId());
        FiCaSchedulerNode schedulerNode = (FiCaSchedulerNode)((CapacityScheduler)rm.getResourceScheduler()).getNodeTracker().getNode(((MockNM)preemptedNode.get()).getNodeId());
        Resource curResource = schedulerNode.getUnallocatedResource();
        schedulerNode.deductUnallocatedResource(Resource.newInstance((Resource)curResource));
        ((CapacityScheduler)rm.getResourceScheduler()).getNodeTracker().removeNode(((MockNM)preemptedNode.get()).getNodeId());
        ((CapacityScheduler)rm.getResourceScheduler()).getNodeTracker().addNode((SchedulerNode)schedulerNode);
        RMNode preemptedRMNode = (RMNode)rm.getRMContext().getRMNodes().get(((MockNM)preemptedNode.get()).getNodeId());
        NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(preemptedRMNode);
        rm.getResourceScheduler().handle((Event)nodeUpdate);
        CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
        ApplicationAttemptId app2AttemptId = app2.getCurrentAppAttempt().getAppAttemptId();
        FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(app2AttemptId);
        Assert.assertEquals((String)"App2 failed to get reserved container", (long)1L, (long)schedulerApp.getReservedContainers().size());
        LOG.info("Reserved node is: " + ((RMContainer)schedulerApp.getReservedContainers().get(0)).getReservedNode());
        Assert.assertNotEquals((String)"Failed to reserve as per the Multi Node Itearor", (Object)((RMContainer)schedulerApp.getReservedContainers().get(0)).getReservedNode(), (Object)((MockNM)preemptedNode.get()).getNodeId());
        schedulerNode = (FiCaSchedulerNode)((CapacityScheduler)rm.getResourceScheduler()).getNodeTracker().getNode(((MockNM)preemptedNode.get()).getNodeId());
        curResource = schedulerNode.getAllocatedResource();
        schedulerNode.updateTotalResource(Resources.add((Resource)schedulerNode.getTotalResource(), (Resource)curResource));
        ((CapacityScheduler)rm.getResourceScheduler()).getNodeTracker().removeNode(((MockNM)preemptedNode.get()).getNodeId());
        ((CapacityScheduler)rm.getResourceScheduler()).getNodeTracker().addNode((SchedulerNode)schedulerNode);
        preemptedRMNode = (RMNode)rm.getRMContext().getRMNodes().get(((MockNM)preemptedNode.get()).getNodeId());
        nodeUpdate = new NodeUpdateSchedulerEvent(preemptedRMNode);
        rm.getResourceScheduler().handle((Event)nodeUpdate);
        GenericTestUtils.waitFor(() -> result.get(), (long)10L, (long)20000L);
        schedulerApp = cs.getApplicationAttempt(app2AttemptId);
        Assert.assertEquals((String)"App2 failed to get Allocated", (long)1L, (long)schedulerApp.getLiveContainers().size());
        Assert.assertEquals((String)"App2 failed to Unreserve", (long)0L, (long)schedulerApp.getReservedContainers().size());
        rm.stop();
    }
}

