/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.junit.Test;

public class TestAMRestart {
    @Test
    public void testAMRestartWithExistingContainers() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.resourcemanager.am.max-attempts", 2);
        MockRM rm1 = new MockRM((Configuration)conf);
        rm1.start();
        RMApp app1 = rm1.submitApp(200, "name", "user", new HashMap<ApplicationAccessType, String>(), false, "default", -1, null, "MAPREDUCE", false, true);
        MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
        nm1.registerNode();
        MockNM nm2 = new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService());
        nm2.registerNode();
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        int NUM_CONTAINERS = 3;
        am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, new ArrayList<ContainerId>());
        nm1.nodeHeartbeat(true);
        List containers = am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers();
        while (containers.size() != NUM_CONTAINERS) {
            nm1.nodeHeartbeat(true);
            containers.addAll(am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>()).getAllocatedContainers());
            Thread.sleep(200L);
        }
        nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
        ContainerId containerId2 = ContainerId.newInstance((ApplicationAttemptId)am1.getApplicationAttemptId(), (int)2);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
        ContainerId containerId3 = ContainerId.newInstance((ApplicationAttemptId)am1.getApplicationAttemptId(), (int)3);
        rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
        ContainerId containerId4 = ContainerId.newInstance((ApplicationAttemptId)am1.getApplicationAttemptId(), (int)4);
        rm1.waitForState(nm1, containerId4, RMContainerState.ACQUIRED);
        am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
        nm1.nodeHeartbeat(true);
        ContainerId containerId5 = ContainerId.newInstance((ApplicationAttemptId)am1.getApplicationAttemptId(), (int)5);
        rm1.waitForContainerAllocated(nm1, containerId5);
        rm1.waitForState(nm1, containerId5, RMContainerState.ALLOCATED);
        am1.allocate("127.0.0.1", 6000, 1, new ArrayList<ContainerId>());
        ContainerId containerId6 = ContainerId.newInstance((ApplicationAttemptId)am1.getApplicationAttemptId(), (int)6);
        nm1.nodeHeartbeat(true);
        FiCaSchedulerApp schedulerAttempt = ((CapacityScheduler)rm1.getResourceScheduler()).getCurrentAttemptForContainer(containerId6);
        while (schedulerAttempt.getReservedContainers().size() == 0) {
            System.out.println("Waiting for container " + containerId6 + " to be reserved.");
            nm1.nodeHeartbeat(true);
            Thread.sleep(200L);
        }
        Assert.assertEquals((Object)containerId6, (Object)((RMContainer)schedulerAttempt.getReservedContainers().get(0)).getContainerId());
        nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
        am1.waitForState(RMAppAttemptState.FAILED);
        Thread.sleep(3000L);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        Assert.assertNull((Object)rm1.getResourceScheduler().getRMContainer(containerId4));
        Assert.assertNull((Object)rm1.getResourceScheduler().getRMContainer(containerId5));
        rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
        ApplicationAttemptId newAttemptId = app1.getCurrentAppAttempt().getAppAttemptId();
        Assert.assertFalse((boolean)newAttemptId.equals((Object)am1.getApplicationAttemptId()));
        RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
        nm1.nodeHeartbeat(true);
        MockAM am2 = rm1.sendAMLaunched(attempt2.getAppAttemptId());
        RegisterApplicationMasterResponse registerResponse = am2.registerAppAttempt();
        Assert.assertEquals((int)2, (int)registerResponse.getContainersFromPreviousAttempts().size());
        boolean containerId2Exists = false;
        boolean containerId3Exists = false;
        for (Container container : registerResponse.getContainersFromPreviousAttempts()) {
            if (container.getId().equals((Object)containerId2)) {
                containerId2Exists = true;
            }
            if (!container.getId().equals((Object)containerId3)) continue;
            containerId3Exists = true;
        }
        Assert.assertTrue((containerId2Exists && containerId3Exists ? 1 : 0) != 0);
        rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
        nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.COMPLETE);
        RMAppAttempt newAttempt = app1.getRMAppAttempt(am2.getApplicationAttemptId());
        this.waitForContainersToFinish(4, newAttempt);
        boolean container3Exists = false;
        boolean container4Exists = false;
        boolean container5Exists = false;
        boolean container6Exists = false;
        for (ContainerStatus status : newAttempt.getJustFinishedContainers()) {
            if (status.getContainerId().equals((Object)containerId3)) {
                container3Exists = true;
            }
            if (status.getContainerId().equals((Object)containerId4)) {
                container4Exists = true;
            }
            if (status.getContainerId().equals((Object)containerId5)) {
                container5Exists = true;
            }
            if (!status.getContainerId().equals((Object)containerId6)) continue;
            container6Exists = true;
        }
        Assert.assertTrue((container3Exists && container4Exists && container5Exists && container6Exists ? 1 : 0) != 0);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        FiCaSchedulerApp schedulerNewAttempt = ((CapacityScheduler)rm1.getResourceScheduler()).getCurrentAttemptForContainer(containerId2);
        MockRM.finishAMAndVerifyAppState(app1, rm1, nm1, am2);
        Assert.assertFalse((boolean)schedulerNewAttempt.getLiveContainers().contains(containerId2));
        System.out.println("New attempt's just finished containers: " + newAttempt.getJustFinishedContainers());
        this.waitForContainersToFinish(5, newAttempt);
        rm1.stop();
    }

    private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt) throws InterruptedException {
        for (int count = 0; attempt.getJustFinishedContainers().size() != expectedNum && count < 500; ++count) {
            Thread.sleep(100L);
        }
    }

    @Test
    public void testNMTokensRebindOnAMRestart() throws Exception {
        YarnConfiguration conf = new YarnConfiguration();
        conf.setInt("yarn.resourcemanager.am.max-attempts", 3);
        MockRM rm1 = new MockRM((Configuration)conf);
        rm1.start();
        RMApp app1 = rm1.submitApp(200, "myname", "myuser", new HashMap<ApplicationAccessType, String>(), false, "default", -1, null, "MAPREDUCE", false, true);
        MockNM nm1 = new MockNM("127.0.0.1:1234", 8000, rm1.getResourceTrackerService());
        nm1.registerNode();
        MockNM nm2 = new MockNM("127.1.1.1:4321", 8000, rm1.getResourceTrackerService());
        nm2.registerNode();
        MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
        ArrayList containers = new ArrayList();
        ArrayList expectedNMTokens = new ArrayList();
        while (true) {
            AllocateResponse response = am1.allocate("127.0.0.1", 2000, 2, new ArrayList<ContainerId>());
            nm1.nodeHeartbeat(true);
            containers.addAll(response.getAllocatedContainers());
            expectedNMTokens.addAll(response.getNMTokens());
            if (containers.size() == 2) break;
            Thread.sleep(200L);
            System.out.println("Waiting for container to be allocated.");
        }
        nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
        ContainerId containerId2 = ContainerId.newInstance((ApplicationAttemptId)am1.getApplicationAttemptId(), (int)2);
        rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
        nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 3, ContainerState.RUNNING);
        ContainerId containerId3 = ContainerId.newInstance((ApplicationAttemptId)am1.getApplicationAttemptId(), (int)3);
        rm1.waitForState(nm1, containerId3, RMContainerState.RUNNING);
        nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
        am1.waitForState(RMAppAttemptState.FAILED);
        rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
        MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
        RegisterApplicationMasterResponse registerResponse = am2.registerAppAttempt();
        rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
        Assert.assertEquals(expectedNMTokens, (Object)registerResponse.getNMTokensFromPreviousAttempts());
        containers = new ArrayList();
        while (true) {
            AllocateResponse allocateResponse = am2.allocate("127.1.1.1", 4000, 1, new ArrayList<ContainerId>());
            nm2.nodeHeartbeat(true);
            containers.addAll(allocateResponse.getAllocatedContainers());
            expectedNMTokens.addAll(allocateResponse.getNMTokens());
            if (containers.size() == 1) break;
            Thread.sleep(200L);
            System.out.println("Waiting for container to be allocated.");
        }
        nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 2, ContainerState.RUNNING);
        ContainerId am2ContainerId2 = ContainerId.newInstance((ApplicationAttemptId)am2.getApplicationAttemptId(), (int)2);
        rm1.waitForState(nm1, am2ContainerId2, RMContainerState.RUNNING);
        nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
        am2.waitForState(RMAppAttemptState.FAILED);
        rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
        MockAM am3 = MockRM.launchAM(app1, rm1, nm1);
        registerResponse = am3.registerAppAttempt();
        rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
        List transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
        Assert.assertEquals((int)2, (int)transferredTokens.size());
        Assert.assertTrue((boolean)transferredTokens.containsAll(expectedNMTokens));
        rm1.stop();
    }
}

