/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.v2.app.launcher;

import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetLocalizationStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.util.Records;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestContainerLauncher {
    private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
    Configuration conf;
    Server server;
    static final Logger LOG = LoggerFactory.getLogger(TestContainerLauncher.class);

    @Test(timeout=10000L)
    public void testPoolSize() throws InterruptedException {
        TaskAttemptId taskAttemptId;
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)67);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)3);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)8);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)9, (TaskType)TaskType.MAP);
        AppContext context = (AppContext)Mockito.mock(AppContext.class);
        CustomContainerLauncher containerLauncher = new CustomContainerLauncher(context);
        containerLauncher.init(new Configuration());
        containerLauncher.start();
        ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
        Assertions.assertThat((int)containerLauncher.initialPoolSize).isEqualTo(10);
        Assert.assertEquals((long)0L, (long)threadPool.getPoolSize());
        Assert.assertEquals((long)containerLauncher.initialPoolSize, (long)threadPool.getCorePoolSize());
        Assert.assertNull((Object)containerLauncher.foundErrors);
        containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize;
        for (int i = 0; i < 10; ++i) {
            ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)i);
            taskAttemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)i);
            containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
        }
        this.waitForEvents(containerLauncher, 10);
        Assert.assertEquals((long)10L, (long)threadPool.getPoolSize());
        Assert.assertNull((Object)containerLauncher.foundErrors);
        containerLauncher.finishEventHandling = true;
        int timeOut = 0;
        while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) {
            LOG.info("Waiting for number of events processed to become 10. It is now " + containerLauncher.numEventsProcessed.get() + ". Timeout is " + timeOut);
            Thread.sleep(1000L);
        }
        Assert.assertEquals((long)10L, (long)containerLauncher.numEventsProcessed.get());
        containerLauncher.finishEventHandling = false;
        for (int i = 0; i < 10; ++i) {
            ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)(i + 10));
            TaskAttemptId taskAttemptId2 = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)(i + 10));
            containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId2, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
        }
        this.waitForEvents(containerLauncher, 20);
        Assert.assertEquals((long)10L, (long)threadPool.getPoolSize());
        Assert.assertNull((Object)containerLauncher.foundErrors);
        containerLauncher.expectedCorePoolSize = 11 + containerLauncher.initialPoolSize;
        containerLauncher.finishEventHandling = false;
        ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)21L);
        taskAttemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)21);
        containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host11:1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
        this.waitForEvents(containerLauncher, 21);
        Assert.assertEquals((long)11L, (long)threadPool.getPoolSize());
        Assert.assertNull((Object)containerLauncher.foundErrors);
        containerLauncher.stop();
        Configuration conf = new Configuration();
        conf.setInt("yarn.app.mapreduce.am.containerlauncher.threadpool-initial-size", 20);
        containerLauncher = new CustomContainerLauncher(context);
        containerLauncher.init(conf);
        Assertions.assertThat((int)containerLauncher.initialPoolSize).isEqualTo(20);
    }

    @Test(timeout=5000L)
    public void testPoolLimits() throws InterruptedException {
        int i;
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)67);
        ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)3);
        JobId jobId = MRBuilderUtils.newJobId((ApplicationId)appId, (int)8);
        TaskId taskId = MRBuilderUtils.newTaskId((JobId)jobId, (int)9, (TaskType)TaskType.MAP);
        TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId((TaskId)taskId, (int)0);
        ContainerId containerId = ContainerId.newContainerId((ApplicationAttemptId)appAttemptId, (long)10L);
        AppContext context = (AppContext)Mockito.mock(AppContext.class);
        CustomContainerLauncher containerLauncher = new CustomContainerLauncher(context);
        Configuration conf = new Configuration();
        conf.setInt("yarn.app.mapreduce.am.containerlauncher.thread-count-limit", 12);
        containerLauncher.init(conf);
        containerLauncher.start();
        ThreadPoolExecutor threadPool = containerLauncher.getThreadPool();
        containerLauncher.expectedCorePoolSize = containerLauncher.initialPoolSize;
        for (i = 0; i < 10; ++i) {
            containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
        }
        this.waitForEvents(containerLauncher, 10);
        Assert.assertEquals((long)10L, (long)threadPool.getPoolSize());
        Assert.assertNull((Object)containerLauncher.foundErrors);
        containerLauncher.expectedCorePoolSize = 12;
        for (i = 1; i <= 4; ++i) {
            containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId, containerId, "host1" + i + ":1234", null, ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
        }
        this.waitForEvents(containerLauncher, 12);
        Assert.assertEquals((long)12L, (long)threadPool.getPoolSize());
        Assert.assertNull((Object)containerLauncher.foundErrors);
        containerLauncher.finishEventHandling = true;
        this.waitForEvents(containerLauncher, 14);
        Assert.assertEquals((long)12L, (long)threadPool.getPoolSize());
        Assert.assertNull((Object)containerLauncher.foundErrors);
        containerLauncher.stop();
    }

    private void waitForEvents(CustomContainerLauncher containerLauncher, int expectedNumEvents) throws InterruptedException {
        int timeOut = 0;
        while (containerLauncher.numEventsProcessing.get() < expectedNumEvents && timeOut++ < 20) {
            LOG.info("Waiting for number of events to become " + expectedNumEvents + ". It is now " + containerLauncher.numEventsProcessing.get());
            Thread.sleep(1000L);
        }
        Assert.assertEquals((long)expectedNumEvents, (long)containerLauncher.numEventsProcessing.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=15000L)
    public void testSlowNM() throws Exception {
        this.conf = new Configuration();
        int maxAttempts = 1;
        this.conf.setInt("mapreduce.map.maxattempts", maxAttempts);
        this.conf.setBoolean("mapreduce.job.ubertask.enable", false);
        this.conf.setInt("yarn.rpc.nm-command-timeout", 3000);
        this.conf.set("yarn.ipc.rpc.class", HadoopYarnProtoRPC.class.getName());
        YarnRPC rpc = YarnRPC.create((Configuration)this.conf);
        String bindAddr = "localhost:0";
        InetSocketAddress addr = NetUtils.createSocketAddr((String)bindAddr);
        NMTokenSecretManagerInNM tokenSecretManager = new NMTokenSecretManagerInNM();
        MasterKey masterKey = (MasterKey)Records.newRecord(MasterKey.class);
        masterKey.setBytes(ByteBuffer.wrap("key".getBytes()));
        tokenSecretManager.setMasterKey(masterKey);
        this.conf.set("hadoop.security.authentication", "token");
        this.server = rpc.getServer(ContainerManagementProtocol.class, (Object)new DummyContainerManager(), addr, this.conf, (SecretManager)tokenSecretManager, 1);
        this.server.start();
        MRAppWithSlowNM app = new MRAppWithSlowNM(tokenSecretManager);
        try {
            Job job = app.submit(this.conf);
            app.waitForState(job, JobState.RUNNING);
            Map tasks = job.getTasks();
            Assert.assertEquals((String)"Num tasks is not correct", (long)1L, (long)tasks.size());
            Task task = (Task)tasks.values().iterator().next();
            app.waitForState(task, TaskState.SCHEDULED);
            Map attempts = ((Task)tasks.values().iterator().next()).getAttempts();
            Assert.assertEquals((String)"Num attempts is not correct", (long)maxAttempts, (long)attempts.size());
            TaskAttempt attempt = (TaskAttempt)attempts.values().iterator().next();
            app.waitForInternalState((TaskAttemptImpl)attempt, TaskAttemptStateInternal.ASSIGNED);
            app.waitForState(job, JobState.FAILED);
            String diagnostics = attempt.getDiagnostics().toString();
            LOG.info("attempt.getDiagnostics: " + diagnostics);
            Assert.assertTrue((boolean)diagnostics.contains("Container launch failed for container_0_0000_01_000000 : "));
            Assert.assertTrue((boolean)diagnostics.contains("java.net.SocketTimeoutException: 3000 millis timeout while waiting for channel"));
        }
        finally {
            this.server.stop();
            app.stop();
        }
    }

    public class DummyContainerManager
    implements ContainerManagementProtocol {
        private ContainerStatus status = null;

        public GetContainerStatusesResponse getContainerStatuses(GetContainerStatusesRequest request) throws IOException {
            ArrayList<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
            statuses.add(this.status);
            return GetContainerStatusesResponse.newInstance(statuses, null);
        }

        public StartContainersResponse startContainers(StartContainersRequest requests) throws IOException {
            StartContainerRequest request = (StartContainerRequest)requests.getStartContainerRequests().get(0);
            ContainerTokenIdentifier containerTokenIdentifier = MRApp.newContainerTokenIdentifier(request.getContainerToken());
            Assert.assertEquals((Object)(MRApp.NM_HOST + ":" + MRApp.NM_PORT), (Object)containerTokenIdentifier.getNmHostAddress());
            StartContainersResponse response = (StartContainersResponse)recordFactory.newRecordInstance(StartContainersResponse.class);
            this.status = (ContainerStatus)recordFactory.newRecordInstance(ContainerStatus.class);
            try {
                Thread.sleep(15000L);
            }
            catch (Exception e) {
                LOG.error("Setup thread sleep interrupted: ", (Throwable)e);
                throw new UndeclaredThrowableException(e);
            }
            this.status.setState(ContainerState.RUNNING);
            this.status.setContainerId(containerTokenIdentifier.getContainerID());
            this.status.setExitStatus(0);
            return response;
        }

        public StopContainersResponse stopContainers(StopContainersRequest request) throws IOException {
            Exception e = new Exception("Dummy function", new Exception("Dummy function cause"));
            throw new IOException(e);
        }

        @Deprecated
        public IncreaseContainersResourceResponse increaseContainersResource(IncreaseContainersResourceRequest request) throws IOException, IOException {
            Exception e = new Exception("Dummy function", new Exception("Dummy function cause"));
            throw new IOException(e);
        }

        public SignalContainerResponse signalToContainer(SignalContainerRequest request) throws YarnException, IOException {
            return null;
        }

        public ResourceLocalizationResponse localize(ResourceLocalizationRequest request) throws YarnException, IOException {
            return null;
        }

        public ReInitializeContainerResponse reInitializeContainer(ReInitializeContainerRequest request) throws YarnException, IOException {
            return null;
        }

        public RestartContainerResponse restartContainer(ContainerId containerId) throws YarnException, IOException {
            return null;
        }

        public RollbackResponse rollbackLastReInitialization(ContainerId containerId) throws YarnException, IOException {
            return null;
        }

        public CommitResponse commitLastReInitialization(ContainerId containerId) throws YarnException, IOException {
            return null;
        }

        public ContainerUpdateResponse updateContainer(ContainerUpdateRequest request) throws YarnException, IOException {
            return null;
        }

        public GetLocalizationStatusesResponse getLocalizationStatuses(GetLocalizationStatusesRequest request) throws YarnException, IOException {
            return null;
        }
    }

    private class MRAppWithSlowNM
    extends MRApp {
        private NMTokenSecretManagerInNM tokenSecretManager;

        public MRAppWithSlowNM(NMTokenSecretManagerInNM tokenSecretManager) {
            super(1, 0, false, "TestContainerLauncher", true);
            this.tokenSecretManager = tokenSecretManager;
        }

        @Override
        protected ContainerLauncher createContainerLauncher(AppContext context) {
            return new ContainerLauncherImpl(context){

                public ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(String containerMgrBindAddr, ContainerId containerId) throws IOException {
                    ContainerManagementProtocolProxy cmProxy;
                    InetSocketAddress addr = NetUtils.getConnectAddress((Server)TestContainerLauncher.this.server);
                    String containerManagerBindAddr = addr.getHostName() + ":" + addr.getPort();
                    Token token = MRAppWithSlowNM.this.tokenSecretManager.createNMToken(containerId.getApplicationAttemptId(), NodeId.newInstance((String)addr.getHostName(), (int)addr.getPort()), "user");
                    ContainerManagementProtocolProxy containerManagementProtocolProxy = cmProxy = new ContainerManagementProtocolProxy(TestContainerLauncher.this.conf);
                    Objects.requireNonNull(containerManagementProtocolProxy);
                    ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData proxy = new ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData(containerManagementProtocolProxy, YarnRPC.create((Configuration)TestContainerLauncher.this.conf), containerManagerBindAddr, containerId, token);
                    return proxy;
                }
            };
        }
    }

    private final class CustomContainerLauncher
    extends ContainerLauncherImpl {
        private volatile int expectedCorePoolSize;
        private AtomicInteger numEventsProcessing;
        private AtomicInteger numEventsProcessed;
        private volatile String foundErrors;
        private volatile boolean finishEventHandling;

        private CustomContainerLauncher(AppContext context) {
            super(context);
            this.expectedCorePoolSize = 0;
            this.numEventsProcessing = new AtomicInteger(0);
            this.numEventsProcessed = new AtomicInteger(0);
            this.foundErrors = null;
        }

        public ThreadPoolExecutor getThreadPool() {
            return this.launcherPool;
        }

        protected ContainerLauncherImpl.EventProcessor createEventProcessor(ContainerLauncherEvent event) {
            if (this.expectedCorePoolSize != this.launcherPool.getCorePoolSize()) {
                this.foundErrors = "Expected " + this.expectedCorePoolSize + " but found " + this.launcherPool.getCorePoolSize();
            }
            return new CustomEventProcessor(event);
        }

        private final class CustomEventProcessor
        extends ContainerLauncherImpl.EventProcessor {
            private final ContainerLauncherEvent event;

            private CustomEventProcessor(ContainerLauncherEvent event) {
                super((ContainerLauncherImpl)CustomContainerLauncher.this, event);
                this.event = event;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void run() {
                ContainerLauncherImpl.LOG.info("Processing the event " + this.event.toString());
                CustomContainerLauncher.this.numEventsProcessing.incrementAndGet();
                while (!CustomContainerLauncher.this.finishEventHandling) {
                    CustomEventProcessor customEventProcessor = this;
                    synchronized (customEventProcessor) {
                        try {
                            ((Object)((Object)this)).wait(1000L);
                        }
                        catch (InterruptedException interruptedException) {
                            // empty catch block
                        }
                    }
                }
                CustomContainerLauncher.this.numEventsProcessed.incrementAndGet();
            }
        }
    }
}

