/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.uam;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.AMHeartbeatRequestHandler;
import org.apache.hadoop.yarn.server.AMRMClientRelayer;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.uam.UnmanagedAMPoolManager;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.util.AsyncCallback;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestUnmanagedApplicationManager {
    private static final Logger LOG = LoggerFactory.getLogger(TestUnmanagedApplicationManager.class);
    private TestableUnmanagedApplicationManager uam;
    private Configuration conf = new YarnConfiguration();
    private CountingCallback callback;
    private ApplicationAttemptId attemptId;
    private UnmanagedAMPoolManager uamPool;
    private ExecutorService threadpool;

    @Before
    public void setup() {
        this.conf.set("yarn.resourcemanager.cluster-id", "subclusterId");
        this.callback = new CountingCallback();
        this.attemptId = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)0L, (int)1), (int)1);
        this.uam = new TestableUnmanagedApplicationManager(this.conf, this.attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, "rm", null);
        this.threadpool = Executors.newCachedThreadPool();
        this.uamPool = new TestableUnmanagedAMPoolManager(this.threadpool);
        this.uamPool.init(this.conf);
        this.uamPool.start();
    }

    @After
    public void tearDown() throws IOException, InterruptedException {
        if (this.uam != null) {
            this.uam.shutDownConnections();
            this.uam = null;
        }
        if (this.uamPool != null) {
            if (this.uamPool.isInState(Service.STATE.STARTED)) {
                this.uamPool.stop();
            }
            this.uamPool = null;
        }
        if (this.threadpool != null) {
            this.threadpool.shutdownNow();
            this.threadpool = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForCallBackCountAndCheckZeroPending(CountingCallback callBack, int expectCallBackCount) {
        CountingCallback countingCallback = callBack;
        synchronized (countingCallback) {
            while (callBack.callBackCount != expectCallBackCount) {
                try {
                    callBack.wait();
                }
                catch (InterruptedException interruptedException) {}
            }
            Assert.assertEquals((String)("Non zero pending requests when number of allocate callbacks reaches " + expectCallBackCount), (long)0L, (long)callBack.requestQueueSize);
        }
    }

    @Test(timeout=10000L)
    public void testBasicUsage() throws YarnException, IOException, InterruptedException {
        this.launchUAM(this.attemptId);
        this.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance(null, (int)0, null), this.attemptId);
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        this.waitForCallBackCountAndCheckZeroPending(this.callback, 1);
        this.finishApplicationMaster(FinishApplicationMasterRequest.newInstance(null, null, null), this.attemptId);
        while (this.uam.isHeartbeatThreadAlive()) {
            LOG.info("waiting for heartbeat thread to finish");
            Thread.sleep(100L);
        }
    }

    @Test(timeout=5000L)
    public void testUAMReAttach() throws YarnException, IOException, InterruptedException {
        this.launchUAM(this.attemptId);
        this.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance(null, (int)0, null), this.attemptId);
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        this.waitForCallBackCountAndCheckZeroPending(this.callback, 1);
        MockResourceManagerFacade rmProxy = this.uam.getRMProxy();
        this.uam = new TestableUnmanagedApplicationManager(this.conf, this.attemptId.getApplicationId(), null, "submitter", "appNameSuffix", true, "rm");
        this.uam.setRMProxy(rmProxy);
        this.reAttachUAM(null, this.attemptId);
        this.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance(null, (int)0, null), this.attemptId);
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        this.waitForCallBackCountAndCheckZeroPending(this.callback, 2);
        this.finishApplicationMaster(FinishApplicationMasterRequest.newInstance(null, null, null), this.attemptId);
    }

    @Test(timeout=5000L)
    public void testReRegister() throws YarnException, IOException, InterruptedException {
        this.launchUAM(this.attemptId);
        this.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance(null, (int)0, null), this.attemptId);
        this.uam.setShouldReRegisterNext();
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        this.waitForCallBackCountAndCheckZeroPending(this.callback, 1);
        this.uam.setShouldReRegisterNext();
        this.finishApplicationMaster(FinishApplicationMasterRequest.newInstance(null, null, null), this.attemptId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testSlowRegisterCall() throws YarnException, IOException, InterruptedException {
        Object syncObj;
        Thread registerAMThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    TestUnmanagedApplicationManager.this.launchUAM(TestUnmanagedApplicationManager.this.attemptId);
                    TestUnmanagedApplicationManager.this.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance(null, (int)1001, null), TestUnmanagedApplicationManager.this.attemptId);
                }
                catch (Exception e) {
                    LOG.info("Register thread exception", (Throwable)e);
                }
            }
        });
        Object object = syncObj = MockResourceManagerFacade.getRegisterSyncObj();
        synchronized (object) {
            LOG.info("Starting register thread");
            registerAMThread.start();
            try {
                LOG.info("Test main starts waiting");
                syncObj.wait();
                LOG.info("Test main wait finished");
            }
            catch (Exception e) {
                LOG.info("Test main wait interrupted", (Throwable)e);
            }
        }
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        object = syncObj;
        synchronized (object) {
            syncObj.notifyAll();
        }
        LOG.info("Test main wait for register thread to finish");
        registerAMThread.join();
        LOG.info("Register thread finished");
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        this.waitForCallBackCountAndCheckZeroPending(this.callback, 2);
        this.finishApplicationMaster(FinishApplicationMasterRequest.newInstance(null, null, null), this.attemptId);
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
        Assert.assertEquals((long)0L, (long)this.callback.requestQueueSize);
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        Assert.assertEquals((long)2L, (long)this.callback.callBackCount);
    }

    @Test(expected=Exception.class)
    public void testAllocateWithoutRegister() throws YarnException, IOException, InterruptedException {
        this.allocateAsync(AllocateRequest.newInstance((int)0, (float)0.0f, null, null, null), this.callback, this.attemptId);
    }

    @Test(expected=Exception.class)
    public void testFinishWithoutRegister() throws YarnException, IOException, InterruptedException {
        this.finishApplicationMaster(FinishApplicationMasterRequest.newInstance(null, null, null), this.attemptId);
    }

    @Test(timeout=10000L)
    public void testForceKill() throws YarnException, IOException, InterruptedException {
        this.launchUAM(this.attemptId);
        this.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance(null, (int)0, null), this.attemptId);
        this.uam.forceKillApplication();
        while (this.uam.isHeartbeatThreadAlive()) {
            LOG.info("waiting for heartbeat thread to finish");
            Thread.sleep(100L);
        }
        try {
            this.uam.forceKillApplication();
            Assert.fail((String)"Should fail because application is already killed");
        }
        catch (YarnException yarnException) {
            // empty catch block
        }
    }

    @Test(timeout=10000L)
    public void testShutDownConnections() throws YarnException, IOException, InterruptedException {
        this.launchUAM(this.attemptId);
        this.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance(null, (int)0, null), this.attemptId);
        this.uam.shutDownConnections();
        while (this.uam.isHeartbeatThreadAlive()) {
            LOG.info("waiting for heartbeat thread to finish");
            Thread.sleep(100L);
        }
    }

    protected UserGroupInformation getUGIWithToken(ApplicationAttemptId appAttemptId) {
        UserGroupInformation ugi = UserGroupInformation.createRemoteUser((String)appAttemptId.toString());
        AMRMTokenIdentifier token = new AMRMTokenIdentifier(appAttemptId, 1);
        ugi.addTokenIdentifier((TokenIdentifier)token);
        return ugi;
    }

    protected Token<AMRMTokenIdentifier> launchUAM(ApplicationAttemptId appAttemptId) throws IOException, InterruptedException {
        return (Token)this.getUGIWithToken(appAttemptId).doAs(() -> this.uam.launchUAM());
    }

    protected void reAttachUAM(Token<AMRMTokenIdentifier> uamToken, ApplicationAttemptId appAttemptId) throws IOException, InterruptedException {
        this.getUGIWithToken(appAttemptId).doAs(() -> {
            this.uam.reAttachUAM(uamToken);
            return null;
        });
    }

    protected RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest request, ApplicationAttemptId appAttemptId) throws YarnException, IOException, InterruptedException {
        return (RegisterApplicationMasterResponse)this.getUGIWithToken(appAttemptId).doAs(() -> this.uam.registerApplicationMaster(request));
    }

    protected void allocateAsync(AllocateRequest request, AsyncCallback<AllocateResponse> callBack, ApplicationAttemptId appAttemptId) throws YarnException, IOException, InterruptedException {
        this.getUGIWithToken(appAttemptId).doAs(() -> {
            this.uam.allocateAsync(request, callBack);
            return null;
        });
    }

    protected FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest request, ApplicationAttemptId appAttemptId) throws YarnException, IOException, InterruptedException {
        return (FinishApplicationMasterResponse)this.getUGIWithToken(appAttemptId).doAs(() -> this.uam.finishApplicationMaster(request));
    }

    @Test
    public void testSeparateThreadWithoutBlockServiceStop() throws Exception {
        ApplicationAttemptId attemptId1 = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)Time.now(), (int)1), (int)1);
        Token token1 = this.uamPool.launchUAM("SC-1", this.conf, attemptId1.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-1", null);
        Assert.assertNotNull((Object)token1);
        ApplicationAttemptId attemptId2 = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)Time.now(), (int)2), (int)1);
        Token token2 = this.uamPool.launchUAM("SC-2", this.conf, attemptId2.getApplicationId(), "default", "test-user", "SC-HOME", true, "SC-2", null);
        Assert.assertNotNull((Object)token2);
        Map unmanagedAppMasterMap = this.uamPool.getUnmanagedAppMasterMap();
        Assert.assertNotNull((Object)unmanagedAppMasterMap);
        Assert.assertEquals((long)2L, (long)unmanagedAppMasterMap.size());
        this.uamPool.stop();
        Assert.assertTrue((boolean)this.uamPool.waitForServiceToStop(2000L));
        Assert.assertEquals((Object)Service.STATE.STOPPED, (Object)this.uamPool.getServiceState());
        Thread finishApplicationThread = this.uamPool.getFinishApplicationThread();
        GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(), (long)100L, (long)2000L);
        Assert.assertEquals((long)0L, (long)unmanagedAppMasterMap.size());
    }

    @Test
    public void testApplicationAttributes() throws IOException, YarnException, InterruptedException, TimeoutException {
        long now = Time.now();
        ApplicationId applicationId = ApplicationId.newInstance((long)now, (int)10);
        ApplicationSubmissionContext appSubmissionContext = ApplicationSubmissionContext.newInstance((ApplicationId)applicationId, (String)"test", (String)"default", (Priority)Priority.newInstance((int)10), null, (boolean)true, (boolean)true, (int)2, (Resource)Resource.newInstance((int)10, (int)2), (String)"test");
        Set<String> tags = Collections.singleton("1");
        appSubmissionContext.setApplicationTags(tags);
        Token token1 = this.uamPool.launchUAM("SC-1", this.conf, applicationId, "default", "test-user", "SC-HOME", true, "SC-1", appSubmissionContext);
        Assert.assertNotNull((Object)token1);
        Map unmanagedAppMasterMap = this.uamPool.getUnmanagedAppMasterMap();
        UnmanagedApplicationManager uamApplicationManager = (UnmanagedApplicationManager)unmanagedAppMasterMap.get("SC-1");
        Assert.assertNotNull((Object)uamApplicationManager);
        ApplicationSubmissionContext appSubmissionContextByUam = uamApplicationManager.getApplicationSubmissionContext();
        Assert.assertNotNull((Object)appSubmissionContext);
        Assert.assertEquals((long)10L, (long)appSubmissionContextByUam.getPriority().getPriority());
        Assert.assertEquals((Object)"test", (Object)appSubmissionContextByUam.getApplicationType());
        Assert.assertEquals((long)1L, (long)appSubmissionContextByUam.getApplicationTags().size());
        this.uamPool.stop();
        Thread finishApplicationThread = this.uamPool.getFinishApplicationThread();
        GenericTestUtils.waitFor(() -> !finishApplicationThread.isAlive(), (long)100L, (long)2000L);
        Assert.assertEquals((long)0L, (long)unmanagedAppMasterMap.size());
    }

    protected class TestableUnmanagedAMPoolManager
    extends UnmanagedAMPoolManager {
        public TestableUnmanagedAMPoolManager(ExecutorService threadpool) {
            super(threadpool);
        }

        public UnmanagedApplicationManager createUAM(Configuration configuration, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, String rmId, ApplicationSubmissionContext originalAppSubmissionContext) {
            return new TestableUnmanagedApplicationManager(configuration, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, rmId, originalAppSubmissionContext);
        }
    }

    public class TestableAMRequestHandlerThread
    extends AMHeartbeatRequestHandler {
        public TestableAMRequestHandlerThread(Configuration conf, ApplicationId applicationId, AMRMClientRelayer rmProxyRelayer) {
            super(conf, applicationId, rmProxyRelayer);
        }

        public void run() {
            try {
                TestUnmanagedApplicationManager.this.getUGIWithToken(TestUnmanagedApplicationManager.this.attemptId).doAs(() -> {
                    TestableAMRequestHandlerThread.super.run();
                    return null;
                });
            }
            catch (Exception e) {
                LOG.error("Exception running TestableAMRequestHandlerThread", (Throwable)e);
            }
        }
    }

    public class TestableUnmanagedApplicationManager
    extends UnmanagedApplicationManager {
        private MockResourceManagerFacade rmProxy;

        public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, String rmName) {
            this(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, rmName, null);
        }

        public TestableUnmanagedApplicationManager(Configuration conf, ApplicationId appId, String queueName, String submitter, String appNameSuffix, boolean keepContainersAcrossApplicationAttempts, String rmName, ApplicationSubmissionContext originalApplicationSubmissionContext) {
            super(conf, appId, queueName, submitter, appNameSuffix, keepContainersAcrossApplicationAttempts, rmName, originalApplicationSubmissionContext);
        }

        protected AMHeartbeatRequestHandler createAMHeartbeatRequestHandler(Configuration config, ApplicationId appId, AMRMClientRelayer rmProxyRelayer) {
            return new TestableAMRequestHandlerThread(config, appId, rmProxyRelayer);
        }

        protected <T> T createRMProxy(Class<T> protocol, Configuration config, UserGroupInformation user, Token<AMRMTokenIdentifier> token) {
            if (this.rmProxy == null) {
                this.rmProxy = new MockResourceManagerFacade(config, 0);
            }
            return (T)this.rmProxy;
        }

        public void setShouldReRegisterNext() {
            if (this.rmProxy != null) {
                this.rmProxy.setShouldReRegisterNext();
            }
        }

        public MockResourceManagerFacade getRMProxy() {
            return this.rmProxy;
        }

        public void setRMProxy(MockResourceManagerFacade proxy) {
            this.rmProxy = proxy;
        }
    }

    protected class CountingCallback
    implements AsyncCallback<AllocateResponse> {
        private int callBackCount;
        private int requestQueueSize;

        protected CountingCallback() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void callback(AllocateResponse response) {
            CountingCallback countingCallback = this;
            synchronized (countingCallback) {
                ++this.callBackCount;
                this.requestQueueSize = TestUnmanagedApplicationManager.this.uam.getRequestQueueSize();
                this.notifyAll();
            }
        }
    }
}

