/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.rm;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.TaskScheduler;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers;
import org.apache.tez.dag.app.rm.TezAMRMClientAsync;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeMap;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestContainerReuse {
    private static final Log LOG = LogFactory.getLog(TestContainerReuse.class);

    @Test(timeout=15000L)
    public void testDelayedReuseContainerBecomesAvailable() throws IOException, InterruptedException, ExecutionException {
        AtomicBoolean drainNotifier;
        Configuration conf = new Configuration((Configuration)new YarnConfiguration());
        conf.setBoolean("tez.am.container.reuse.enabled", true);
        conf.setBoolean("tez.am.container.reuse.rack-fallback.enabled", false);
        conf.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", false);
        conf.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 3000L);
        RackResolver.init((Configuration)conf);
        TaskScheduler.TaskSchedulerAppCallback mockApp = (TaskScheduler.TaskSchedulerAppCallback)Mockito.mock(TaskScheduler.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID dagID = TezDAGID.getInstance((String)"0", (int)0, (int)0);
        TezVertexID vertexID = TezVertexID.getInstance((TezDAGID)dagID, (int)1);
        TestTaskSchedulerHelpers.AMRMClientForTest rmClientCore = new TestTaskSchedulerHelpers.AMRMClientForTest();
        TezAMRMClientAsync rmClient = (TezAMRMClientAsync)Mockito.spy((Object)((Object)new TestTaskSchedulerHelpers.AMRMClientAsyncForTest((AMRMClient<TaskScheduler.CookieContainerRequest>)rmClientCore, 100)));
        String appUrl = "url";
        String appMsg = "success";
        TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus finalStatus = new TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
        ((TaskScheduler.TaskSchedulerAppCallback)Mockito.doReturn((Object)finalStatus).when((Object)mockApp)).getFinalAppStatus();
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        AMContainerMap amContainerMap = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), appContext);
        AMNodeMap amNodeMap = new AMNodeMap((EventHandler)eventHandler, appContext);
        ((AppContext)Mockito.doReturn((Object)amContainerMap).when((Object)appContext)).getAllContainers();
        ((AppContext)Mockito.doReturn((Object)DAGAppMasterState.RUNNING).when((Object)appContext)).getAMState();
        ((AppContext)Mockito.doReturn((Object)amNodeMap).when((Object)appContext)).getAllNodes();
        ((AppContext)Mockito.doReturn((Object)dagID).when((Object)appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)Mockito.mock(ClusterInfo.class)).when((Object)appContext)).getClusterInfo();
        TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest taskSchedulerEventHandlerReal = new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, eventHandler, (TezAMRMClientAsync<TaskScheduler.CookieContainerRequest>)rmClient, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher());
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler)Mockito.spy((Object)((Object)taskSchedulerEventHandlerReal));
        taskSchedulerEventHandler.init(conf);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback taskScheduler = (TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest)taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
        taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier = new AtomicBoolean(false);
        Resource resource = Resource.newInstance((int)1024, (int)1);
        Priority priority = Priority.newInstance((int)5);
        String[] host1 = new String[]{"host1"};
        String[] host2 = new String[]{"host2"};
        String[] defaultRack = new String[]{"/default-rack"};
        TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)1), (int)1);
        TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)2), (int)1);
        TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)3), (int)1);
        TaskAttempt ta11 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        TaskAttempt ta21 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        TaskAttempt ta31 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrTa11 = this.createLaunchRequestEvent(taID11, ta11, resource, host1, defaultRack, priority);
        AMSchedulerEventTALaunchRequest lrTa21 = this.createLaunchRequestEvent(taID21, ta21, resource, host2, defaultRack, priority);
        AMSchedulerEventTALaunchRequest lrTa31 = this.createLaunchRequestEvent(taID31, ta31, resource, host1, defaultRack, priority);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrTa11);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrTa21);
        Container containerHost1 = this.createContainer(1, host1[0], resource, priority);
        Container containerHost2 = this.createContainer(2, host2[0], resource, priority);
        drainNotifier.set(false);
        taskScheduler.onContainersAllocated(Lists.newArrayList((Object[])new Container[]{containerHost1, containerHost2}));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta11), Matchers.any(Object.class), (Container)Matchers.eq((Object)containerHost1));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta21), Matchers.any(Object.class), (Container)Matchers.eq((Object)containerHost2));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrTa31);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta11), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler, (VerificationMode)Mockito.times((int)1))).taskAllocated(Matchers.eq((Object)ta31), Matchers.any(Object.class), (Container)Matchers.eq((Object)containerHost1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)containerHost1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        eventHandler.reset();
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
        long currentTs = System.currentTimeMillis();
        Throwable exception = null;
        while (System.currentTimeMillis() < currentTs + 5000L) {
            try {
                ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler, (VerificationMode)Mockito.times((int)1))).containerBeingReleased((ContainerId)Matchers.eq((Object)containerHost2.getId()));
                exception = null;
                break;
            }
            catch (Throwable e) {
                exception = e;
            }
        }
        Assert.assertTrue((String)"containerHost2 was not released", (exception == null ? 1 : 0) != 0);
        taskScheduler.stop();
        taskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout=15000L)
    public void testDelayedReuseContainerNotAvailable() throws IOException, InterruptedException, ExecutionException {
        AtomicBoolean drainNotifier;
        Configuration conf = new Configuration((Configuration)new YarnConfiguration());
        conf.setBoolean("tez.am.container.reuse.enabled", true);
        conf.setBoolean("tez.am.container.reuse.rack-fallback.enabled", false);
        conf.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", false);
        conf.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 1000L);
        RackResolver.init((Configuration)conf);
        TaskScheduler.TaskSchedulerAppCallback mockApp = (TaskScheduler.TaskSchedulerAppCallback)Mockito.mock(TaskScheduler.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID dagID = TezDAGID.getInstance((String)"0", (int)0, (int)0);
        TezVertexID vertexID = TezVertexID.getInstance((TezDAGID)dagID, (int)1);
        TestTaskSchedulerHelpers.AMRMClientForTest rmClientCore = new TestTaskSchedulerHelpers.AMRMClientForTest();
        TezAMRMClientAsync rmClient = (TezAMRMClientAsync)Mockito.spy((Object)((Object)new TestTaskSchedulerHelpers.AMRMClientAsyncForTest((AMRMClient<TaskScheduler.CookieContainerRequest>)rmClientCore, 100)));
        String appUrl = "url";
        String appMsg = "success";
        TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus finalStatus = new TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
        ((TaskScheduler.TaskSchedulerAppCallback)Mockito.doReturn((Object)finalStatus).when((Object)mockApp)).getFinalAppStatus();
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        AMContainerMap amContainerMap = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), appContext);
        AMNodeMap amNodeMap = new AMNodeMap((EventHandler)eventHandler, appContext);
        ((AppContext)Mockito.doReturn((Object)amContainerMap).when((Object)appContext)).getAllContainers();
        ((AppContext)Mockito.doReturn((Object)amNodeMap).when((Object)appContext)).getAllNodes();
        ((AppContext)Mockito.doReturn((Object)DAGAppMasterState.RUNNING).when((Object)appContext)).getAMState();
        ((AppContext)Mockito.doReturn((Object)dagID).when((Object)appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)Mockito.mock(ClusterInfo.class)).when((Object)appContext)).getClusterInfo();
        TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest taskSchedulerEventHandlerReal = new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, eventHandler, (TezAMRMClientAsync<TaskScheduler.CookieContainerRequest>)rmClient, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher());
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler)Mockito.spy((Object)((Object)taskSchedulerEventHandlerReal));
        taskSchedulerEventHandler.init(conf);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback taskScheduler = (TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest)taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
        taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier = new AtomicBoolean(false);
        Resource resource = Resource.newInstance((int)1024, (int)1);
        Priority priority = Priority.newInstance((int)5);
        String[] host1 = new String[]{"host1"};
        String[] host2 = new String[]{"host2"};
        String[] defaultRack = new String[]{"/default-rack"};
        TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)1), (int)1);
        TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)2), (int)1);
        TezTaskAttemptID taID31 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)3), (int)1);
        TaskAttempt ta11 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        TaskAttempt ta21 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        TaskAttempt ta31 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrTa11 = this.createLaunchRequestEvent(taID11, ta11, resource, host1, defaultRack, priority);
        AMSchedulerEventTALaunchRequest lrTa21 = this.createLaunchRequestEvent(taID21, ta21, resource, host2, defaultRack, priority);
        AMSchedulerEventTALaunchRequest lrTa31 = this.createLaunchRequestEvent(taID31, ta31, resource, host1, defaultRack, priority);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrTa11);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrTa21);
        Container containerHost1 = this.createContainer(1, host1[0], resource, priority);
        Container containerHost2 = this.createContainer(2, host2[0], resource, priority);
        drainNotifier.set(false);
        taskScheduler.onContainersAllocated(Lists.newArrayList((Object[])new Container[]{containerHost1, containerHost2}));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta11), Matchers.any(Object.class), (Container)Matchers.eq((Object)containerHost1));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta21), Matchers.any(Object.class), (Container)Matchers.eq((Object)containerHost2));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrTa31);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta21), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler, (VerificationMode)Mockito.times((int)0))).taskAllocated(Matchers.eq((Object)ta31), Matchers.any(Object.class), (Container)Matchers.eq((Object)containerHost2));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)1))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)containerHost2.getId()));
        eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        taskScheduler.stop();
        taskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout=10000L)
    public void testSimpleReuse() throws IOException, InterruptedException, ExecutionException {
        AtomicBoolean drainNotifier;
        Configuration tezConf = new Configuration((Configuration)new YarnConfiguration());
        tezConf.setBoolean("tez.am.container.reuse.enabled", true);
        tezConf.setBoolean("tez.am.container.reuse.rack-fallback.enabled", true);
        tezConf.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 0L);
        tezConf.setLong("tez.am.container.session.delay-allocation-millis", 0L);
        RackResolver.init((Configuration)tezConf);
        TaskScheduler.TaskSchedulerAppCallback mockApp = (TaskScheduler.TaskSchedulerAppCallback)Mockito.mock(TaskScheduler.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID dagID = TezDAGID.getInstance((String)"0", (int)0, (int)0);
        TestTaskSchedulerHelpers.AMRMClientForTest rmClientCore = new TestTaskSchedulerHelpers.AMRMClientForTest();
        TezAMRMClientAsync rmClient = (TezAMRMClientAsync)Mockito.spy((Object)((Object)new TestTaskSchedulerHelpers.AMRMClientAsyncForTest((AMRMClient<TaskScheduler.CookieContainerRequest>)rmClientCore, 100)));
        String appUrl = "url";
        String appMsg = "success";
        TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus finalStatus = new TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
        ((TaskScheduler.TaskSchedulerAppCallback)Mockito.doReturn((Object)finalStatus).when((Object)mockApp)).getFinalAppStatus();
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        AMContainerMap amContainerMap = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), appContext);
        AMNodeMap amNodeMap = new AMNodeMap((EventHandler)eventHandler, appContext);
        ((AppContext)Mockito.doReturn((Object)amContainerMap).when((Object)appContext)).getAllContainers();
        ((AppContext)Mockito.doReturn((Object)amNodeMap).when((Object)appContext)).getAllNodes();
        ((AppContext)Mockito.doReturn((Object)DAGAppMasterState.RUNNING).when((Object)appContext)).getAMState();
        ((AppContext)Mockito.doReturn((Object)dagID).when((Object)appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)Mockito.mock(ClusterInfo.class)).when((Object)appContext)).getClusterInfo();
        TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest taskSchedulerEventHandlerReal = new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, eventHandler, (TezAMRMClientAsync<TaskScheduler.CookieContainerRequest>)rmClient, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher());
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler)Mockito.spy((Object)((Object)taskSchedulerEventHandlerReal));
        taskSchedulerEventHandler.init(tezConf);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback taskScheduler = (TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest)taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
        taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier = new AtomicBoolean(false);
        Resource resource1 = Resource.newInstance((int)1024, (int)1);
        String[] host1 = new String[]{"host1"};
        String[] host2 = new String[]{"host2"};
        String[] racks = new String[]{"/default-rack"};
        Priority priority1 = Priority.newInstance((int)1);
        TezVertexID vertexID1 = TezVertexID.getInstance((TezDAGID)dagID, (int)1);
        TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID1, (int)1), (int)1);
        TaskAttempt ta11 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent1 = this.createLaunchRequestEvent(taID11, ta11, resource1, host1, racks, priority1);
        TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID1, (int)2), (int)1);
        TaskAttempt ta12 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent2 = this.createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
        TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID1, (int)3), (int)1);
        TaskAttempt ta13 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent3 = this.createLaunchRequestEvent(taID13, ta13, resource1, host2, racks, priority1);
        TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID1, (int)4), (int)1);
        TaskAttempt ta14 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent4 = this.createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent1);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent2);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent3);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent4);
        Container container1 = this.createContainer(1, "host1", resource1, priority1);
        drainNotifier.set(false);
        taskScheduler.onContainersAllocated(Collections.singletonList(container1));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta11), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta11), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta12), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        eventHandler.reset();
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta12), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta13), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        eventHandler.reset();
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED));
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler, (VerificationMode)Mockito.times((int)0))).taskAllocated(Matchers.eq((Object)ta14), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta13), Matchers.eq((boolean)false));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient)).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        eventHandler.reset();
        Container container2 = this.createContainer(2, "host2", resource1, priority1);
        drainNotifier.set(false);
        taskScheduler.onContainersAllocated(Collections.singletonList(container2));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta14), Matchers.any(Object.class), (Container)Matchers.eq((Object)container2));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta14), Matchers.eq((boolean)true));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient)).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container2.getId()));
        eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        eventHandler.reset();
        taskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout=30000L)
    public void testReuseNonLocalRequest() throws IOException, InterruptedException, ExecutionException {
        AtomicBoolean drainNotifier;
        Configuration tezConf = new Configuration((Configuration)new YarnConfiguration());
        tezConf.setBoolean("tez.am.container.reuse.enabled", true);
        tezConf.setBoolean("tez.am.container.reuse.rack-fallback.enabled", true);
        tezConf.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", true);
        tezConf.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 100L);
        tezConf.setLong("tez.am.container.session.delay-allocation-millis", 10000L);
        RackResolver.init((Configuration)tezConf);
        TaskScheduler.TaskSchedulerAppCallback mockApp = (TaskScheduler.TaskSchedulerAppCallback)Mockito.mock(TaskScheduler.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID dagID = TezDAGID.getInstance((String)"0", (int)0, (int)0);
        TestTaskSchedulerHelpers.AMRMClientForTest rmClientCore = new TestTaskSchedulerHelpers.AMRMClientForTest();
        TezAMRMClientAsync rmClient = (TezAMRMClientAsync)Mockito.spy((Object)((Object)new TestTaskSchedulerHelpers.AMRMClientAsyncForTest((AMRMClient<TaskScheduler.CookieContainerRequest>)rmClientCore, 100)));
        String appUrl = "url";
        String appMsg = "success";
        TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus finalStatus = new TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
        ((TaskScheduler.TaskSchedulerAppCallback)Mockito.doReturn((Object)finalStatus).when((Object)mockApp)).getFinalAppStatus();
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        AMContainerMap amContainerMap = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), appContext);
        AMNodeMap amNodeMap = new AMNodeMap((EventHandler)eventHandler, appContext);
        ((AppContext)Mockito.doReturn((Object)amContainerMap).when((Object)appContext)).getAllContainers();
        ((AppContext)Mockito.doReturn((Object)amNodeMap).when((Object)appContext)).getAllNodes();
        ((AppContext)Mockito.doReturn((Object)DAGAppMasterState.RUNNING).when((Object)appContext)).getAMState();
        ((AppContext)Mockito.doReturn((Object)dagID).when((Object)appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)Mockito.mock(ClusterInfo.class)).when((Object)appContext)).getClusterInfo();
        TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest taskSchedulerEventHandlerReal = new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, eventHandler, (TezAMRMClientAsync<TaskScheduler.CookieContainerRequest>)rmClient, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher());
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler)Mockito.spy((Object)((Object)taskSchedulerEventHandlerReal));
        taskSchedulerEventHandler.init(tezConf);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback taskScheduler = (TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest)taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
        taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier = new AtomicBoolean(false);
        Resource resource1 = Resource.newInstance((int)1024, (int)1);
        String[] emptyHosts = new String[]{};
        String[] racks = new String[]{"default-rack"};
        Priority priority = Priority.newInstance((int)3);
        TezVertexID vertexID = TezVertexID.getInstance((TezDAGID)dagID, (int)1);
        TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)1), (int)1);
        TaskAttempt ta11 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        ((TaskAttempt)Mockito.doReturn((Object)vertexID).when((Object)ta11)).getVertexID();
        AMSchedulerEventTALaunchRequest lrEvent11 = this.createLaunchRequestEvent(taID11, ta11, resource1, emptyHosts, racks, priority);
        TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID, (int)2), (int)1);
        TaskAttempt ta12 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        ((TaskAttempt)Mockito.doReturn((Object)vertexID).when((Object)ta12)).getVertexID();
        AMSchedulerEventTALaunchRequest lrEvent12 = this.createLaunchRequestEvent(taID12, ta12, resource1, emptyHosts, racks, priority);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent11);
        Container container1 = this.createContainer(1, "randomHost", resource1, priority);
        drainNotifier.set(false);
        taskScheduler.onContainersAllocated(Collections.singletonList(container1));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta11), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent12);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta11), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler, (VerificationMode)Mockito.times((int)0))).taskAllocated(Matchers.eq((Object)ta12), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        eventHandler.reset();
        LOG.info((Object)"Sleeping to ensure that the scheduling loop runs");
        Thread.sleep(6000L);
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta12), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient)).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        taskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout=30000L)
    public void testReuseAcrossVertices() throws IOException, InterruptedException, ExecutionException {
        AtomicBoolean drainNotifier;
        Configuration tezConf = new Configuration((Configuration)new YarnConfiguration());
        tezConf.setBoolean("tez.am.container.reuse.enabled", true);
        tezConf.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 1L);
        tezConf.setLong("tez.am.container.session.delay-allocation-millis", 2000L);
        RackResolver.init((Configuration)tezConf);
        TaskScheduler.TaskSchedulerAppCallback mockApp = (TaskScheduler.TaskSchedulerAppCallback)Mockito.mock(TaskScheduler.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID dagID = TezDAGID.getInstance((String)"0", (int)0, (int)0);
        TestTaskSchedulerHelpers.AMRMClientForTest rmClientCore = new TestTaskSchedulerHelpers.AMRMClientForTest();
        TezAMRMClientAsync rmClient = (TezAMRMClientAsync)Mockito.spy((Object)((Object)new TestTaskSchedulerHelpers.AMRMClientAsyncForTest((AMRMClient<TaskScheduler.CookieContainerRequest>)rmClientCore, 100)));
        String appUrl = "url";
        String appMsg = "success";
        TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus finalStatus = new TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
        ((TaskScheduler.TaskSchedulerAppCallback)Mockito.doReturn((Object)finalStatus).when((Object)mockApp)).getFinalAppStatus();
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        AMContainerMap amContainerMap = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), appContext);
        AMNodeMap amNodeMap = new AMNodeMap((EventHandler)eventHandler, appContext);
        ((AppContext)Mockito.doReturn((Object)amContainerMap).when((Object)appContext)).getAllContainers();
        ((AppContext)Mockito.doReturn((Object)amNodeMap).when((Object)appContext)).getAllNodes();
        ((AppContext)Mockito.doReturn((Object)DAGAppMasterState.RUNNING).when((Object)appContext)).getAMState();
        ((AppContext)Mockito.doReturn((Object)true).when((Object)appContext)).isSession();
        ((AppContext)Mockito.doReturn((Object)dagID).when((Object)appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)Mockito.mock(ClusterInfo.class)).when((Object)appContext)).getClusterInfo();
        TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest taskSchedulerEventHandlerReal = new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, eventHandler, (TezAMRMClientAsync<TaskScheduler.CookieContainerRequest>)rmClient, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher());
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler)Mockito.spy((Object)((Object)taskSchedulerEventHandlerReal));
        taskSchedulerEventHandler.init(tezConf);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback taskScheduler = (TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest)taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
        taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier = new AtomicBoolean(false);
        Resource resource1 = Resource.newInstance((int)1024, (int)1);
        String[] host1 = new String[]{"host1"};
        String[] racks = new String[]{"/default-rack"};
        Priority priority1 = Priority.newInstance((int)3);
        Priority priority2 = Priority.newInstance((int)4);
        TezVertexID vertexID1 = TezVertexID.getInstance((TezDAGID)dagID, (int)1);
        TezVertexID vertexID2 = TezVertexID.getInstance((TezDAGID)dagID, (int)2);
        TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID1, (int)1), (int)1);
        TaskAttempt ta11 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        ((TaskAttempt)Mockito.doReturn((Object)vertexID1).when((Object)ta11)).getVertexID();
        AMSchedulerEventTALaunchRequest lrEvent11 = this.createLaunchRequestEvent(taID11, ta11, resource1, host1, racks, priority1);
        TezTaskAttemptID taID21 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID2, (int)1), (int)1);
        TaskAttempt ta21 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        ((TaskAttempt)Mockito.doReturn((Object)vertexID2).when((Object)ta21)).getVertexID();
        AMSchedulerEventTALaunchRequest lrEvent21 = this.createLaunchRequestEvent(taID21, ta21, resource1, host1, racks, priority2);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent11);
        Container container1 = this.createContainer(1, host1[0], resource1, priority1);
        drainNotifier.set(false);
        taskScheduler.onContainersAllocated(Collections.singletonList(container1));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta11), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent21);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta11), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta21), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta21, container1.getId(), TaskAttemptState.SUCCEEDED));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        LOG.info((Object)"Sleeping to ensure that the scheduling loop runs");
        Thread.sleep(6000L);
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient)).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
        taskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    @Test(timeout=30000L)
    public void testReuseLocalResourcesChanged() throws IOException, InterruptedException, ExecutionException {
        AtomicBoolean drainNotifier;
        Configuration tezConf = new Configuration((Configuration)new YarnConfiguration());
        tezConf.setBoolean("tez.am.container.reuse.enabled", true);
        tezConf.setBoolean("tez.am.container.reuse.rack-fallback.enabled", true);
        tezConf.setBoolean("tez.am.container.reuse.non-local-fallback.enabled", true);
        tezConf.setLong("tez.am.container.reuse.locality.delay-allocation-millis", 0L);
        tezConf.setLong("tez.am.container.session.delay-allocation-millis", -1L);
        RackResolver.init((Configuration)tezConf);
        TaskScheduler.TaskSchedulerAppCallback mockApp = (TaskScheduler.TaskSchedulerAppCallback)Mockito.mock(TaskScheduler.TaskSchedulerAppCallback.class);
        TestTaskSchedulerHelpers.CapturingEventHandler eventHandler = new TestTaskSchedulerHelpers.CapturingEventHandler();
        TezDAGID dagID1 = TezDAGID.getInstance((String)"0", (int)1, (int)0);
        TestTaskSchedulerHelpers.AMRMClientForTest rmClientCore = new TestTaskSchedulerHelpers.AMRMClientForTest();
        TezAMRMClientAsync rmClient = (TezAMRMClientAsync)Mockito.spy((Object)((Object)new TestTaskSchedulerHelpers.AMRMClientAsyncForTest((AMRMClient<TaskScheduler.CookieContainerRequest>)rmClientCore, 100)));
        String appUrl = "url";
        String appMsg = "success";
        TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus finalStatus = new TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
        ((TaskScheduler.TaskSchedulerAppCallback)Mockito.doReturn((Object)finalStatus).when((Object)mockApp)).getFinalAppStatus();
        AppContext appContext = (AppContext)Mockito.mock(AppContext.class);
        ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
        AMContainerMap amContainerMap = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskAttemptListener)Mockito.mock(TaskAttemptListener.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), appContext);
        AMNodeMap amNodeMap = new AMNodeMap((EventHandler)eventHandler, appContext);
        ((AppContext)Mockito.doReturn((Object)amContainerMap).when((Object)appContext)).getAllContainers();
        ((AppContext)Mockito.doReturn((Object)amNodeMap).when((Object)appContext)).getAllNodes();
        ((AppContext)Mockito.doReturn((Object)DAGAppMasterState.RUNNING).when((Object)appContext)).getAMState();
        ((AppContext)Mockito.doReturn((Object)true).when((Object)appContext)).isSession();
        ((AppContext)Mockito.doAnswer((Answer)dagIDAnswer).when((Object)appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)Mockito.mock(ClusterInfo.class)).when((Object)appContext)).getClusterInfo();
        TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest taskSchedulerEventHandlerReal = new TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest(appContext, eventHandler, (TezAMRMClientAsync<TaskScheduler.CookieContainerRequest>)rmClient, new TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher());
        TaskSchedulerEventHandler taskSchedulerEventHandler = (TaskSchedulerEventHandler)Mockito.spy((Object)((Object)taskSchedulerEventHandlerReal));
        taskSchedulerEventHandler.init(tezConf);
        taskSchedulerEventHandler.start();
        TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback taskScheduler = (TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest)taskSchedulerEventHandler).getSpyTaskScheduler();
        TestTaskSchedulerHelpers.TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
        taskScheduler.delayedContainerManager.drainedDelayedContainers = drainNotifier = new AtomicBoolean(false);
        Resource resource1 = Resource.newInstance((int)1024, (int)1);
        String[] host1 = new String[]{"host1"};
        String[] racks = new String[]{"/default-rack"};
        Priority priority1 = Priority.newInstance((int)1);
        String rsrc1 = "rsrc1";
        String rsrc2 = "rsrc2";
        String rsrc3 = "rsrc3";
        LocalResource lr1 = (LocalResource)Mockito.mock(LocalResource.class);
        LocalResource lr2 = (LocalResource)Mockito.mock(LocalResource.class);
        LocalResource lr3 = (LocalResource)Mockito.mock(LocalResource.class);
        AMContainerEventAssignTA assignEvent = null;
        HashMap dag1LRs = Maps.newHashMap();
        dag1LRs.put(rsrc1, lr1);
        TezVertexID vertexID11 = TezVertexID.getInstance((TezDAGID)dagID1, (int)1);
        TezTaskAttemptID taID111 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID11, (int)1), (int)1);
        TaskAttempt ta111 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent11 = this.createLaunchRequestEvent(taID111, ta111, resource1, host1, racks, priority1, dag1LRs);
        TezTaskAttemptID taID112 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID11, (int)2), (int)1);
        TaskAttempt ta112 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent12 = this.createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent11);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent12);
        Container container1 = this.createContainer(1, "host1", resource1, priority1);
        drainNotifier.set(false);
        taskScheduler.onContainersAllocated(Collections.singletonList(container1));
        TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
        drainableAppCallback.drain();
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta111), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        assignEvent = (AMContainerEventAssignTA)eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
        Assert.assertEquals((long)1L, (long)assignEvent.getRemoteTaskLocalResources().size());
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta111), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta112), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        assignEvent = (AMContainerEventAssignTA)eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
        Assert.assertEquals((long)1L, (long)assignEvent.getRemoteTaskLocalResources().size());
        eventHandler.reset();
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta112), Matchers.eq((boolean)true));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        eventHandler.reset();
        TezDAGID dagID2 = TezDAGID.getInstance((String)"0", (int)2, (int)0);
        dagIDAnswer.setDAGID(dagID2);
        HashMap dag2LRs = Maps.newHashMap();
        dag2LRs.put(rsrc2, lr2);
        dag2LRs.put(rsrc3, lr3);
        TezVertexID vertexID21 = TezVertexID.getInstance((TezDAGID)dagID2, (int)1);
        TezTaskAttemptID taID211 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID21, (int)1), (int)1);
        TaskAttempt ta211 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent21 = this.createLaunchRequestEvent(taID211, ta211, resource1, host1, racks, priority1, dag2LRs);
        TezTaskAttemptID taID212 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vertexID21, (int)2), (int)1);
        TaskAttempt ta212 = (TaskAttempt)Mockito.mock(TaskAttempt.class);
        AMSchedulerEventTALaunchRequest lrEvent22 = this.createLaunchRequestEvent(taID212, ta212, resource1, host1, racks, priority1, dag2LRs);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent21);
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)lrEvent22);
        drainableAppCallback.drain();
        LOG.info((Object)"Sleeping to ensure that the scheduling loop runs");
        Thread.sleep(6000L);
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta211), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        assignEvent = (AMContainerEventAssignTA)eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
        Assert.assertEquals((long)2L, (long)assignEvent.getRemoteTaskLocalResources().size());
        eventHandler.reset();
        taskSchedulerEventHandler.handleEvent((AMSchedulerEvent)new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED));
        drainableAppCallback.drain();
        ((TestTaskSchedulerHelpers.TaskSchedulerWithDrainableAppCallback)((Object)Mockito.verify((Object)((Object)taskScheduler)))).deallocateTask(Matchers.eq((Object)ta211), Matchers.eq((boolean)true));
        ((TaskSchedulerEventHandler)Mockito.verify((Object)taskSchedulerEventHandler)).taskAllocated(Matchers.eq((Object)ta212), Matchers.any(Object.class), (Container)Matchers.eq((Object)container1));
        ((TezAMRMClientAsync)Mockito.verify((Object)rmClient, (VerificationMode)Mockito.times((int)0))).releaseAssignedContainer((ContainerId)Matchers.eq((Object)container1.getId()));
        eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
        assignEvent = (AMContainerEventAssignTA)eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
        Assert.assertEquals((long)2L, (long)assignEvent.getRemoteTaskLocalResources().size());
        eventHandler.reset();
        taskScheduler.close();
        taskSchedulerEventHandler.close();
    }

    private Container createContainer(int id, String host, Resource resource, Priority priority) {
        ContainerId containerID = ContainerId.newInstance((ApplicationAttemptId)ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)1L, (int)1), (int)1), (int)id);
        NodeId nodeID = NodeId.newInstance((String)host, (int)0);
        Container container = Container.newInstance((ContainerId)containerID, (NodeId)nodeID, (String)(host + ":0"), (Resource)resource, (Priority)priority, null);
        return container;
    }

    private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority, ContainerContext containerContext) {
        AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability, new TaskSpec(taID, "dagName", "vertexName", new ProcessorDescriptor("processorClassName"), Collections.singletonList(new InputSpec("vertexName", new InputDescriptor("inputClassName"), 1)), Collections.singletonList(new OutputSpec("vertexName", new OutputDescriptor("outputClassName"), 1)), null), ta, hosts, racks, priority, containerContext);
        return lr;
    }

    private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority) {
        return this.createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority, new HashMap<String, LocalResource>());
    }

    private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority, Map<String, LocalResource> localResources) {
        return this.createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority, new ContainerContext(localResources, new Credentials(), new HashMap(), ""));
    }

    private static class ChangingDAGIDAnswer
    implements Answer<TezDAGID> {
        private TezDAGID dagID;

        public ChangingDAGIDAnswer(TezDAGID dagID) {
            this.dagID = dagID;
        }

        public void setDAGID(TezDAGID dagID) {
            this.dagID = dagID;
        }

        public TezDAGID answer(InvocationOnMock invocation) throws Throwable {
            return this.dagID;
        }
    }
}

