/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.TestApplicationCleanup;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestApplicationCleanup {
    private static final Log LOG = LogFactory.getLog(TestApplicationCleanup.class);
    private YarnConfiguration conf;

    @Before
    public void setup() throws UnknownHostException {
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        this.conf = new YarnConfiguration();
        UserGroupInformation.setConfiguration((Configuration)this.conf);
        this.conf.set("yarn.resourcemanager.recovery.enabled", "true");
        this.conf.set("yarn.resourcemanager.store.class", MemoryRMStateStore.class.getName());
        Assert.assertTrue((boolean)true);
    }

    @Test
    public void testAppCleanup() throws Exception {
        int contReceived;
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        MockRM rm = new MockRM();
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
        RMApp app = rm.submitApp(2000);
        nm1.nodeHeartbeat(true);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        int request = 2;
        am.allocate("127.0.0.1", 1000, request, new ArrayList());
        nm1.nodeHeartbeat(true);
        List conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers();
        int waitCount = 0;
        for (contReceived = conts.size(); contReceived < request && waitCount++ < 200; contReceived += conts.size()) {
            LOG.info((Object)("Got " + contReceived + " containers. Waiting to get " + request));
            Thread.sleep(100L);
            conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers();
            nm1.nodeHeartbeat(true);
        }
        Assert.assertEquals((long)request, (long)contReceived);
        am.unregisterAppAttempt();
        NodeHeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
        am.waitForState(RMAppAttemptState.FINISHED);
        resp = nm1.nodeHeartbeat(true);
        List containersToCleanup = resp.getContainersToCleanup();
        List appsToCleanup = resp.getApplicationsToCleanup();
        int numCleanedContainers = containersToCleanup.size();
        int numCleanedApps = appsToCleanup.size();
        waitCount = 0;
        while ((numCleanedContainers < 2 || numCleanedApps < 1) && waitCount++ < 200) {
            LOG.info((Object)("Waiting to get cleanup events.. cleanedConts: " + numCleanedContainers + " cleanedApps: " + numCleanedApps));
            Thread.sleep(100L);
            resp = nm1.nodeHeartbeat(true);
            List deltaContainersToCleanup = resp.getContainersToCleanup();
            List deltaAppsToCleanup = resp.getApplicationsToCleanup();
            containersToCleanup.addAll(deltaContainersToCleanup);
            appsToCleanup.addAll(deltaAppsToCleanup);
            numCleanedContainers = containersToCleanup.size();
            numCleanedApps = appsToCleanup.size();
        }
        Assert.assertEquals((long)1L, (long)appsToCleanup.size());
        Assert.assertEquals((Object)app.getApplicationId(), appsToCleanup.get(0));
        Assert.assertEquals((long)1L, (long)numCleanedApps);
        Assert.assertEquals((long)2L, (long)numCleanedContainers);
        rm.stop();
    }

    @Test
    public void testContainerCleanup() throws Exception {
        int cleanedConts;
        int contReceived;
        Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.DEBUG);
        DrainDispatcher dispatcher = new DrainDispatcher();
        1 rm = new /* Unavailable Anonymous Inner Class!! */;
        rm.start();
        MockNM nm1 = rm.registerNode("127.0.0.1:1234", 5000);
        RMApp app = rm.submitApp(2000);
        nm1.nodeHeartbeat(true);
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        int request = 2;
        am.allocate("127.0.0.1", 1000, request, new ArrayList());
        dispatcher.await();
        nm1.nodeHeartbeat(true);
        List conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers();
        int waitCount = 0;
        for (contReceived = conts.size(); contReceived < request && waitCount++ < 200; contReceived += conts.size()) {
            LOG.info((Object)("Got " + contReceived + " containers. Waiting to get " + request));
            Thread.sleep(100L);
            conts = am.allocate(new ArrayList(), new ArrayList()).getAllocatedContainers();
            dispatcher.await();
            nm1.nodeHeartbeat(true);
        }
        Assert.assertEquals((long)request, (long)contReceived);
        ArrayList<ContainerId> release = new ArrayList<ContainerId>();
        release.add(((Container)conts.get(0)).getId());
        am.allocate(new ArrayList(), release);
        dispatcher.await();
        HashMap containerStatuses = new HashMap();
        ArrayList<ContainerStatus> containerStatusList = new ArrayList<ContainerStatus>();
        containerStatusList.add(BuilderUtils.newContainerStatus((ContainerId)((Container)conts.get(0)).getId(), (ContainerState)ContainerState.RUNNING, (String)"nothing", (int)0));
        containerStatuses.put(app.getApplicationId(), containerStatusList);
        NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
        dispatcher.await();
        List contsToClean = resp.getContainersToCleanup();
        waitCount = 0;
        for (cleanedConts = contsToClean.size(); cleanedConts < 1 && waitCount++ < 200; cleanedConts += contsToClean.size()) {
            LOG.info((Object)("Waiting to get cleanup events.. cleanedConts: " + cleanedConts));
            Thread.sleep(100L);
            resp = nm1.nodeHeartbeat(true);
            dispatcher.await();
            contsToClean = resp.getContainersToCleanup();
        }
        LOG.info((Object)("Got cleanup for " + contsToClean.get(0)));
        Assert.assertEquals((long)1L, (long)cleanedConts);
        LOG.info((Object)"Testing container launch much after release and NM getting cleanup");
        containerStatuses.clear();
        containerStatusList.clear();
        containerStatusList.add(BuilderUtils.newContainerStatus((ContainerId)((Container)conts.get(0)).getId(), (ContainerState)ContainerState.RUNNING, (String)"nothing", (int)0));
        containerStatuses.put(app.getApplicationId(), containerStatusList);
        resp = nm1.nodeHeartbeat(containerStatuses, true);
        dispatcher.await();
        contsToClean = resp.getContainersToCleanup();
        waitCount = 0;
        for (cleanedConts = contsToClean.size(); cleanedConts < 1 && waitCount++ < 200; cleanedConts += contsToClean.size()) {
            LOG.info((Object)("Waiting to get cleanup events.. cleanedConts: " + cleanedConts));
            Thread.sleep(100L);
            resp = nm1.nodeHeartbeat(true);
            dispatcher.await();
            contsToClean = resp.getContainersToCleanup();
        }
        LOG.info((Object)("Got cleanup for " + contsToClean.get(0)));
        Assert.assertEquals((long)1L, (long)cleanedConts);
        rm.stop();
    }

    private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId) throws Exception {
        NodeHeartbeatResponse response;
        while ((response = nm.nodeHeartbeat(true)).getApplicationsToCleanup() == null || response.getApplicationsToCleanup().size() != 1 || !appId.equals(response.getApplicationsToCleanup().get(0))) {
            LOG.info((Object)("Haven't got application=" + appId.toString() + " in cleanup list from node heartbeat response, " + "sleep for a while before next heartbeat"));
            Thread.sleep(1000L);
        }
        return;
    }

    private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) throws Exception {
        RMAppAttempt attempt = app.getCurrentAppAttempt();
        nm.nodeHeartbeat(true);
        MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
        am.registerAppAttempt();
        rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
        return am;
    }

    @Test(timeout=60000L)
    public void testAppCleanupWhenRMRestartedAfterAppFinished() throws Exception {
        this.conf.setInt("yarn.resourcemanager.am.max-attempts", 1);
        MemoryRMStateStore memStore = new MemoryRMStateStore();
        memStore.init((Configuration)this.conf);
        MockRM rm1 = new MockRM((Configuration)this.conf, (RMStateStore)memStore);
        rm1.start();
        MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
        nm1.registerNode();
        RMApp app0 = rm1.submitApp(200);
        MockAM am0 = this.launchAM(app0, rm1, nm1);
        nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
        rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
        MockRM rm2 = new MockRM((Configuration)this.conf, (RMStateStore)memStore);
        rm2.start();
        nm1.setResourceTrackerService(rm2.getResourceTrackerService());
        nm1.registerNode(Arrays.asList(app0.getApplicationId()));
        rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
        this.waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
        rm1.stop();
        rm2.stop();
    }

    @Test(timeout=60000L)
    public void testAppCleanupWhenRMRestartedBeforeAppFinished() throws Exception {
        this.conf.setInt("yarn.resourcemanager.am.max-attempts", 1);
        MemoryRMStateStore memStore = new MemoryRMStateStore();
        memStore.init((Configuration)this.conf);
        MockRM rm1 = new MockRM((Configuration)this.conf, (RMStateStore)memStore);
        rm1.start();
        MockNM nm1 = new MockNM("127.0.0.1:1234", 1024, rm1.getResourceTrackerService());
        nm1.registerNode();
        MockNM nm2 = new MockNM("127.0.0.1:5678", 1024, rm1.getResourceTrackerService());
        nm2.registerNode();
        RMApp app0 = rm1.submitApp(200);
        MockAM am0 = this.launchAM(app0, rm1, nm1);
        AllocateResponse allocResponse = am0.allocate(Arrays.asList(ResourceRequest.newInstance((Priority)Priority.newInstance((int)1), (String)"*", (Resource)Resource.newInstance((int)1024, (int)0), (int)1)), null);
        while (null == allocResponse.getAllocatedContainers() || allocResponse.getAllocatedContainers().isEmpty()) {
            nm2.nodeHeartbeat(true);
            allocResponse = am0.allocate(null, null);
            Thread.sleep(1000L);
        }
        MockRM rm2 = new MockRM((Configuration)this.conf, (RMStateStore)memStore);
        rm2.start();
        nm1.setResourceTrackerService(rm2.getResourceTrackerService());
        nm1.registerNode(Arrays.asList(NMContainerStatus.newInstance((ContainerId)ContainerId.newInstance((ApplicationAttemptId)am0.getApplicationAttemptId(), (int)1), (ContainerState)ContainerState.COMPLETE, (Resource)Resource.newInstance((int)1024, (int)1), (String)"", (int)0, (Priority)Priority.newInstance((int)0), (long)1234L)), Arrays.asList(app0.getApplicationId()));
        nm2.setResourceTrackerService(rm2.getResourceTrackerService());
        nm2.registerNode(Arrays.asList(app0.getApplicationId()));
        rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
        this.waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
        this.waitForAppCleanupMessageRecved(nm2, app0.getApplicationId());
        rm1.stop();
        rm2.stop();
    }

    public static void main(String[] args) throws Exception {
        TestApplicationCleanup t = new TestApplicationCleanup();
        t.testAppCleanup();
    }
}

