/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Test;

public class TestApplicationCleanup {
    private static final Log LOG = LogFactory.getLog(TestApplicationCleanup.class);

    @Test
    public void testAppCleanup() throws Exception {
        int cleanedApps;
        int contReceived;
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        MockRM rm = new MockRM();
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
        RMApp app = rm.submitApp(2000);
        nm1.nodeHeartbeat(true);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        int request = 2;
        am.allocate("127.0.0.1", 1000, request, new ArrayList<ContainerId>());
        nm1.nodeHeartbeat(true);
        List conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
        int waitCount = 0;
        for (contReceived = conts.size(); contReceived < request && waitCount++ < 200; contReceived += conts.size()) {
            LOG.info((Object)("Got " + contReceived + " containers. Waiting to get " + request));
            Thread.sleep(100L);
            conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
            nm1.nodeHeartbeat(true);
        }
        Assert.assertEquals((int)request, (int)contReceived);
        am.unregisterAppAttempt();
        NodeHeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
        am.waitForState(RMAppAttemptState.FINISHED);
        resp = nm1.nodeHeartbeat(true);
        List contsToClean = resp.getContainersToCleanup();
        List apps = resp.getApplicationsToCleanup();
        int cleanedConts = contsToClean.size();
        waitCount = 0;
        for (cleanedApps = apps.size(); (cleanedConts < 2 || cleanedApps < 1) && waitCount++ < 200; cleanedConts += contsToClean.size(), cleanedApps += apps.size()) {
            LOG.info((Object)("Waiting to get cleanup events.. cleanedConts: " + cleanedConts + " cleanedApps: " + cleanedApps));
            Thread.sleep(100L);
            resp = nm1.nodeHeartbeat(true);
            contsToClean = resp.getContainersToCleanup();
            apps = resp.getApplicationsToCleanup();
        }
        Assert.assertEquals((int)1, (int)apps.size());
        Assert.assertEquals((Object)app.getApplicationId(), apps.get(0));
        Assert.assertEquals((int)1, (int)cleanedApps);
        Assert.assertEquals((int)2, (int)cleanedConts);
        rm.stop();
    }

    @Test
    public void testContainerCleanup() throws Exception {
        int cleanedConts;
        int contReceived;
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        final DrainDispatcher dispatcher = new DrainDispatcher();
        MockRM rm = new MockRM(){

            protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
                return new ResourceManager.SchedulerEventDispatcher(this.scheduler){

                    public void handle(SchedulerEvent event) {
                        scheduler.handle((Event)event);
                    }
                };
            }

            protected Dispatcher createDispatcher() {
                return dispatcher;
            }
        };
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
        RMApp app = rm.submitApp(2000);
        nm1.nodeHeartbeat(true);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        int request = 2;
        am.allocate("127.0.0.1", 1000, request, new ArrayList<ContainerId>());
        dispatcher.await();
        nm1.nodeHeartbeat(true);
        List conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
        int waitCount = 0;
        for (contReceived = conts.size(); contReceived < request && waitCount++ < 200; contReceived += conts.size()) {
            LOG.info((Object)("Got " + contReceived + " containers. Waiting to get " + request));
            Thread.sleep(100L);
            conts = am.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
            dispatcher.await();
            nm1.nodeHeartbeat(true);
        }
        Assert.assertEquals((int)request, (int)contReceived);
        ArrayList<ContainerId> release = new ArrayList<ContainerId>();
        release.add(((Container)conts.get(0)).getId());
        am.allocate(new ArrayList<ResourceRequest>(), release);
        dispatcher.await();
        HashMap<ApplicationId, List<ContainerStatus>> containerStatuses = new HashMap<ApplicationId, List<ContainerStatus>>();
        ArrayList<ContainerStatus> containerStatusList = new ArrayList<ContainerStatus>();
        containerStatusList.add(BuilderUtils.newContainerStatus((ContainerId)((Container)conts.get(0)).getId(), (ContainerState)ContainerState.RUNNING, (String)"nothing", (int)0));
        containerStatuses.put(app.getApplicationId(), containerStatusList);
        NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
        dispatcher.await();
        List contsToClean = resp.getContainersToCleanup();
        waitCount = 0;
        for (cleanedConts = contsToClean.size(); cleanedConts < 1 && waitCount++ < 200; cleanedConts += contsToClean.size()) {
            LOG.info((Object)("Waiting to get cleanup events.. cleanedConts: " + cleanedConts));
            Thread.sleep(100L);
            resp = nm1.nodeHeartbeat(true);
            dispatcher.await();
            contsToClean = resp.getContainersToCleanup();
        }
        LOG.info((Object)("Got cleanup for " + contsToClean.get(0)));
        Assert.assertEquals((int)1, (int)cleanedConts);
        LOG.info((Object)"Testing container launch much after release and NM getting cleanup");
        containerStatuses.clear();
        containerStatusList.clear();
        containerStatusList.add(BuilderUtils.newContainerStatus((ContainerId)((Container)conts.get(0)).getId(), (ContainerState)ContainerState.RUNNING, (String)"nothing", (int)0));
        containerStatuses.put(app.getApplicationId(), containerStatusList);
        resp = nm1.nodeHeartbeat(containerStatuses, true);
        dispatcher.await();
        contsToClean = resp.getContainersToCleanup();
        waitCount = 0;
        for (cleanedConts = contsToClean.size(); cleanedConts < 1 && waitCount++ < 200; cleanedConts += contsToClean.size()) {
            LOG.info((Object)("Waiting to get cleanup events.. cleanedConts: " + cleanedConts));
            Thread.sleep(100L);
            resp = nm1.nodeHeartbeat(true);
            dispatcher.await();
            contsToClean = resp.getContainersToCleanup();
        }
        LOG.info((Object)("Got cleanup for " + contsToClean.get(0)));
        Assert.assertEquals((int)1, (int)cleanedConts);
        rm.stop();
    }

    public static void main(String[] args) throws Exception {
        TestApplicationCleanup t = new TestApplicationCleanup();
        t.testAppCleanup();
    }
}

