package org.apache.tez.dag.app.rm;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.Utils;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/rm/TaskSchedulerManager.class */
public class TaskSchedulerManager extends AbstractService implements EventHandler<AMSchedulerEvent> {
    static final Logger LOG;
    static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
    static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
    protected final AppContext appContext;
    private final EventHandler eventHandler;
    private final String historyUrl;
    private DAGAppMaster dagAppMaster;
    private Map<ApplicationAccessType, String> appAcls;
    private Thread eventHandlingThread;
    private volatile boolean stopEventHandling;
    protected volatile boolean isSignalled;
    final DAGClientServer clientService;
    private final ContainerSignatureMatcher containerSignatureMatcher;
    private int cachedNodeCount;
    private AtomicBoolean shouldUnregisterFlag;
    private final WebUIService webUI;
    private final NamedEntityDescriptor[] taskSchedulerDescriptors;
    protected final TaskSchedulerWrapper[] taskSchedulers;
    protected final ServicePluginLifecycleAbstractService[] taskSchedulerServiceWrappers;

    @VisibleForTesting
    final ExecutorService appCallbackExecutor;
    private final boolean isLocalMode;
    private final long SCHEDULER_APP_ID_BASE = 111101111;
    private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
    private final HadoopShim hadoopShim;
    BlockingQueue<AMSchedulerEvent> eventQueue;
    static final /* synthetic */ boolean $assertionsDisabled;

    @InterfaceAudience.Private
    @VisibleForTesting
    public TaskSchedulerManager(TaskScheduler taskScheduler, AppContext appContext, ContainerSignatureMatcher containerSignatureMatcher, DAGClientServer dAGClientServer, ExecutorService executorService) {
        super(TaskSchedulerManager.class.getName());
        this.appAcls = null;
        this.isSignalled = false;
        this.cachedNodeCount = -1;
        this.shouldUnregisterFlag = new AtomicBoolean(false);
        this.SCHEDULER_APP_ID_BASE = 111101111L;
        this.SCHEDULER_APP_ID_INCREMENT = 111111111L;
        this.eventQueue = new LinkedBlockingQueue();
        this.appContext = appContext;
        this.containerSignatureMatcher = containerSignatureMatcher;
        this.clientService = dAGClientServer;
        this.eventHandler = appContext.getEventHandler();
        this.appCallbackExecutor = executorService;
        this.taskSchedulers = new TaskSchedulerWrapper[]{new TaskSchedulerWrapper(taskScheduler)};
        this.taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[]{new ServicePluginLifecycleAbstractService(taskScheduler)};
        this.taskSchedulerDescriptors = null;
        this.webUI = null;
        this.historyUrl = null;
        this.isLocalMode = false;
        this.hadoopShim = new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim();
    }

    public TaskSchedulerManager(AppContext appContext, DAGClientServer dAGClientServer, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUIService, List<NamedEntityDescriptor> list, boolean z, HadoopShim hadoopShim) {
        super(TaskSchedulerManager.class.getName());
        this.appAcls = null;
        this.isSignalled = false;
        this.cachedNodeCount = -1;
        this.shouldUnregisterFlag = new AtomicBoolean(false);
        this.SCHEDULER_APP_ID_BASE = 111101111L;
        this.SCHEDULER_APP_ID_INCREMENT = 111111111L;
        this.eventQueue = new LinkedBlockingQueue();
        Preconditions.checkArgument((list == null || list.isEmpty()) ? false : true, "TaskSchedulerDescriptors must be specified");
        this.appContext = appContext;
        this.eventHandler = eventHandler;
        this.clientService = dAGClientServer;
        this.containerSignatureMatcher = containerSignatureMatcher;
        this.webUI = webUIService;
        this.historyUrl = getHistoryUrl();
        this.isLocalMode = z;
        this.hadoopShim = hadoopShim;
        this.appCallbackExecutor = createAppCallbackExecutorService();
        if (this.webUI != null) {
            this.webUI.setHistoryUrl(this.historyUrl);
        }
        this.taskSchedulerDescriptors = (NamedEntityDescriptor[]) list.toArray(new NamedEntityDescriptor[list.size()]);
        this.taskSchedulers = new TaskSchedulerWrapper[this.taskSchedulerDescriptors.length];
        this.taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
    }

    public Map<ApplicationAccessType, String> getApplicationAcls() {
        return this.appAcls;
    }

    public void setSignalled(boolean z) {
        this.isSignalled = z;
        LOG.info("TaskScheduler notified that iSignalled was : " + z);
    }

    public int getNumClusterNodes() {
        return this.cachedNodeCount;
    }

    public Resource getAvailableResources(int i) {
        try {
            return this.taskSchedulers[i].getAvailableResources();
        } catch (Exception e) {
            String str = "Error in TaskScheduler while getting available resources, schedule=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
            throw new RuntimeException(e);
        }
    }

    public Resource getTotalResources(int i) {
        try {
            return this.taskSchedulers[i].getTotalResources();
        } catch (Exception e) {
            String str = "Error in TaskScheduler while getting total resources, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
            throw new RuntimeException(e);
        }
    }

    private ExecutorService createAppCallbackExecutorService() {
        return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d").setDaemon(true).build());
    }

    public synchronized void handleEvent(AMSchedulerEvent aMSchedulerEvent) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Processing the event " + aMSchedulerEvent.toString());
        }
        switch ((AMSchedulerEventType) aMSchedulerEvent.getType()) {
            case S_TA_LAUNCH_REQUEST:
                handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) aMSchedulerEvent);
                return;
            case S_TA_STATE_UPDATED:
                handleTAStateUpdated((AMSchedulerEventTAStateUpdated) aMSchedulerEvent);
                return;
            case S_TA_ENDED:
                AMSchedulerEventTAEnded aMSchedulerEventTAEnded = (AMSchedulerEventTAEnded) aMSchedulerEvent;
                switch (aMSchedulerEventTAEnded.getState()) {
                    case FAILED:
                    case KILLED:
                        handleTAUnsuccessfulEnd(aMSchedulerEventTAEnded);
                        return;
                    case SUCCEEDED:
                        handleTASucceeded(aMSchedulerEventTAEnded);
                        return;
                    default:
                        throw new TezUncheckedException("Unexpected TA_ENDED state: " + aMSchedulerEventTAEnded.getState());
                }
            case S_CONTAINER_DEALLOCATE:
                handleContainerDeallocate((AMSchedulerEventDeallocateContainer) aMSchedulerEvent);
                return;
            case S_NODE_UNBLACKLISTED:
            case S_NODE_BLACKLISTED:
                handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate) aMSchedulerEvent);
                return;
            case S_NODE_UNHEALTHY:
            case S_NODE_HEALTHY:
            default:
                return;
        }
    }

    public void handle(AMSchedulerEvent aMSchedulerEvent) {
        int size = this.eventQueue.size();
        if (size != 0 && size % 1000 == 0) {
            LOG.info("Size of event-queue in RMContainerAllocator is " + size);
        }
        int remainingCapacity = this.eventQueue.remainingCapacity();
        if (remainingCapacity < 1000) {
            LOG.warn("Very low remaining capacity in the event-queue of RMContainerAllocator: " + remainingCapacity);
        }
        try {
            this.eventQueue.put(aMSchedulerEvent);
        } catch (InterruptedException e) {
            throw new TezUncheckedException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(Event<?> event) {
        this.eventHandler.handle(event);
    }

    private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate aMSchedulerEventNodeBlacklistUpdate) {
        boolean z = false;
        try {
            if (aMSchedulerEventNodeBlacklistUpdate.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
                this.taskSchedulers[aMSchedulerEventNodeBlacklistUpdate.getSchedulerId()].blacklistNode(aMSchedulerEventNodeBlacklistUpdate.getNodeId());
            } else if (aMSchedulerEventNodeBlacklistUpdate.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
                this.taskSchedulers[aMSchedulerEventNodeBlacklistUpdate.getSchedulerId()].unblacklistNode(aMSchedulerEventNodeBlacklistUpdate.getNodeId());
            } else {
                z = true;
            }
            if (z) {
                throw new TezUncheckedException("Invalid event type: " + aMSchedulerEventNodeBlacklistUpdate.getType());
            }
        } catch (Exception e) {
            String str = "Error in TaskScheduler for handling node blacklisting, eventType=" + aMSchedulerEventNodeBlacklistUpdate.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(aMSchedulerEventNodeBlacklistUpdate.getSchedulerId(), this.appContext);
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
        }
    }

    private void handleContainerDeallocate(AMSchedulerEventDeallocateContainer aMSchedulerEventDeallocateContainer) {
        ContainerId containerId = aMSchedulerEventDeallocateContainer.getContainerId();
        try {
            this.taskSchedulers[aMSchedulerEventDeallocateContainer.getSchedulerId()].deallocateContainer(containerId);
            sendEvent(new AMContainerEventStopRequest(containerId));
        } catch (Exception e) {
            String str = "Error in TaskScheduler for handling Container De-allocation, eventType=" + aMSchedulerEventDeallocateContainer.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(aMSchedulerEventDeallocateContainer.getSchedulerId(), this.appContext) + ", containerId=" + containerId;
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
        }
    }

    private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded aMSchedulerEventTAEnded) {
        TaskAttempt attempt = aMSchedulerEventTAEnded.getAttempt();
        try {
            boolean deallocateTask = this.taskSchedulers[aMSchedulerEventTAEnded.getSchedulerId()].deallocateTask(attempt, false, aMSchedulerEventTAEnded.getTaskAttemptEndReason(), aMSchedulerEventTAEnded.getDiagnostics());
            ContainerId assignedContainerID = attempt.getAssignedContainerID();
            if (!deallocateTask) {
                LOG.info("Task: " + attempt.getID() + " has no container assignment in the scheduler");
                if (assignedContainerID != null) {
                    LOG.error("No container allocated to task: " + attempt.getID() + " according to scheduler. Task reported container id: " + assignedContainerID);
                }
            }
            if (assignedContainerID != null) {
                sendEvent(new AMContainerEventStopRequest(assignedContainerID));
                sendEvent(new AMNodeEventTaskAttemptEnded(this.appContext.getAllContainers().get(assignedContainerID).getContainer().getNodeId(), aMSchedulerEventTAEnded.getSchedulerId(), assignedContainerID, attempt.getID(), aMSchedulerEventTAEnded.getState() == TaskAttemptState.FAILED));
            }
        } catch (Exception e) {
            String str = "Error in TaskScheduler for handling Task De-allocation, eventType=" + aMSchedulerEventTAEnded.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(aMSchedulerEventTAEnded.getSchedulerId(), this.appContext) + ", taskAttemptId=" + attempt.getID();
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
        }
    }

    private void handleTASucceeded(AMSchedulerEventTAEnded aMSchedulerEventTAEnded) {
        TaskAttempt attempt = aMSchedulerEventTAEnded.getAttempt();
        ContainerId usedContainerId = aMSchedulerEventTAEnded.getUsedContainerId();
        if (aMSchedulerEventTAEnded.getUsedContainerId() != null) {
            sendEvent(new AMContainerEventTASucceeded(usedContainerId, aMSchedulerEventTAEnded.getAttemptID()));
            sendEvent(new AMNodeEventTaskAttemptSucceeded(this.appContext.getAllContainers().get(usedContainerId).getContainer().getNodeId(), aMSchedulerEventTAEnded.getSchedulerId(), usedContainerId, aMSchedulerEventTAEnded.getAttemptID()));
        }
        try {
            if (this.taskSchedulers[aMSchedulerEventTAEnded.getSchedulerId()].deallocateTask(attempt, true, null, aMSchedulerEventTAEnded.getDiagnostics())) {
                return;
            }
            LOG.error("De-allocated successful task: " + attempt.getID() + ", but TaskScheduler reported no container assigned to task");
        } catch (Exception e) {
            String str = "Error in TaskScheduler for handling Task De-allocation, eventType=" + aMSchedulerEventTAEnded.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(aMSchedulerEventTAEnded.getSchedulerId(), this.appContext) + ", taskAttemptId=" + attempt.getID();
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
        }
    }

    private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest aMSchedulerEventTALaunchRequest) {
        TaskAttempt taskAttempt = aMSchedulerEventTALaunchRequest.getTaskAttempt();
        TaskLocationHint locationHint = aMSchedulerEventTALaunchRequest.getLocationHint();
        String[] strArr = null;
        String[] strArr2 = null;
        if (locationHint != null) {
            TaskLocationHint.TaskBasedLocationAffinity affinitizedTask = locationHint.getAffinitizedTask();
            if (affinitizedTask != null) {
                Vertex vertex = this.appContext.getCurrentDAG().getVertex(affinitizedTask.getVertexName());
                Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + affinitizedTask + " for attempt: " + taskAttempt.getID());
                int taskIndex = affinitizedTask.getTaskIndex();
                Preconditions.checkState(taskIndex >= 0 && taskIndex < vertex.getTotalTasks(), "Invalid taskIndex in task based affinity " + affinitizedTask + " for attempt: " + taskAttempt.getID());
                TaskAttempt successfulAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
                if (successfulAttempt != null) {
                    Preconditions.checkNotNull(successfulAttempt.getAssignedContainerID(), successfulAttempt.getID());
                    try {
                        this.taskSchedulers[aMSchedulerEventTALaunchRequest.getSchedulerId()].allocateTask(taskAttempt, aMSchedulerEventTALaunchRequest.getCapability(), successfulAttempt.getAssignedContainerID(), Priority.newInstance(aMSchedulerEventTALaunchRequest.getPriority()), aMSchedulerEventTALaunchRequest.getContainerContext(), aMSchedulerEventTALaunchRequest);
                        return;
                    } catch (Exception e) {
                        String str = "Error in TaskScheduler for handling Task Allocation, eventType=" + aMSchedulerEventTALaunchRequest.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(aMSchedulerEventTALaunchRequest.getSchedulerId(), this.appContext) + ", taskAttemptId=" + taskAttempt.getID();
                        LOG.error(str, e);
                        sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
                        return;
                    }
                }
                LOG.info("No attempt for task affinity to " + affinitizedTask + " for attempt " + taskAttempt.getID() + " Ignoring.");
            } else {
                strArr = locationHint.getHosts() != null ? (String[]) locationHint.getHosts().toArray(new String[locationHint.getHosts().size()]) : null;
                strArr2 = locationHint.getRacks() != null ? (String[]) locationHint.getRacks().toArray(new String[locationHint.getRacks().size()]) : null;
            }
        }
        try {
            this.taskSchedulers[aMSchedulerEventTALaunchRequest.getSchedulerId()].allocateTask(taskAttempt, aMSchedulerEventTALaunchRequest.getCapability(), strArr, strArr2, Priority.newInstance(aMSchedulerEventTALaunchRequest.getPriority()), aMSchedulerEventTALaunchRequest.getContainerContext(), aMSchedulerEventTALaunchRequest);
        } catch (Exception e2) {
            String str2 = "Error in TaskScheduler for handling Task Allocation, eventType=" + aMSchedulerEventTALaunchRequest.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(aMSchedulerEventTALaunchRequest.getSchedulerId(), this.appContext) + ", taskAttemptId=" + taskAttempt.getID();
            LOG.error(str2, e2);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str2, e2));
        }
    }

    private void handleTAStateUpdated(AMSchedulerEventTAStateUpdated aMSchedulerEventTAStateUpdated) {
        try {
            this.taskSchedulers[aMSchedulerEventTAStateUpdated.getSchedulerId()].taskStateUpdated(aMSchedulerEventTAStateUpdated.getTaskAttempt(), aMSchedulerEventTAStateUpdated.getState());
        } catch (Exception e) {
            String str = "Error in TaskScheduler for handling Task State Update, eventType=" + aMSchedulerEventTAStateUpdated.getType() + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(aMSchedulerEventTAStateUpdated.getSchedulerId(), this.appContext) + ", taskAttemptId=" + aMSchedulerEventTAStateUpdated.getTaskAttempt().getID() + ", state=" + aMSchedulerEventTAStateUpdated.getState();
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
        }
    }

    @VisibleForTesting
    TaskScheduler createTaskScheduler(String str, int i, String str2, AppContext appContext, NamedEntityDescriptor namedEntityDescriptor, long j, int i2) throws TezException {
        TaskSchedulerContext wrapTaskSchedulerContext = wrapTaskSchedulerContext(new TaskSchedulerContextImpl(this, appContext, i2, str2, j, str, i, namedEntityDescriptor.getUserPayload()));
        String entityName = namedEntityDescriptor.getEntityName();
        return entityName.equals(TezConstants.getTezYarnServicePluginName()) ? createYarnTaskScheduler(wrapTaskSchedulerContext, i2) : entityName.equals(TezConstants.getTezUberServicePluginName()) ? createUberTaskScheduler(wrapTaskSchedulerContext, i2) : createCustomTaskScheduler(wrapTaskSchedulerContext, namedEntityDescriptor, i2);
    }

    @VisibleForTesting
    TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext taskSchedulerContext) {
        return new TaskSchedulerContextImplWrapper(taskSchedulerContext, this.appCallbackExecutor);
    }

    @VisibleForTesting
    TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext, int i) {
        LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
        return new YarnTaskSchedulerService(taskSchedulerContext);
    }

    @VisibleForTesting
    TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext, int i) {
        LOG.info("Creating TaskScheduler: Local TaskScheduler with clusterIdentifier={}", Long.valueOf(taskSchedulerContext.getCustomClusterIdentifier()));
        return new LocalTaskSchedulerService(taskSchedulerContext);
    }

    TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext, NamedEntityDescriptor namedEntityDescriptor, int i) throws TezException {
        LOG.info("Creating custom TaskScheduler {}:{} with clusterIdentifier={}", new Object[]{namedEntityDescriptor.getEntityName(), namedEntityDescriptor.getClassName(), Long.valueOf(taskSchedulerContext.getCustomClusterIdentifier())});
        return (TaskScheduler) ReflectionUtils.createClazzInstance(namedEntityDescriptor.getClassName(), new Class[]{TaskSchedulerContext.class}, new Object[]{taskSchedulerContext});
    }

    @VisibleForTesting
    protected void instantiateSchedulers(String str, int i, String str2, AppContext appContext) throws TezException {
        long j;
        int i2 = 0;
        for (int i3 = 0; i3 < this.taskSchedulerDescriptors.length; i3++) {
            if ((this.isLocalMode && this.taskSchedulerDescriptors[i3].getEntityName().equals(TezConstants.getTezUberServicePluginName())) || this.taskSchedulerDescriptors[i3].getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
                j = appContext.getApplicationID().getClusterTimestamp();
            } else {
                int i4 = i2;
                i2++;
                j = 111101111 + (i4 * 111111111);
            }
            this.taskSchedulers[i3] = new TaskSchedulerWrapper(createTaskScheduler(str, i, str2, appContext, this.taskSchedulerDescriptors[i3], j, i3));
            this.taskSchedulerServiceWrappers[i3] = new ServicePluginLifecycleAbstractService(this.taskSchedulers[i3].getTaskScheduler());
        }
    }

    public synchronized void serviceStart() throws Exception {
        InetSocketAddress bindAddress = this.clientService.getBindAddress();
        this.dagAppMaster = this.appContext.getAppMaster();
        instantiateSchedulers(bindAddress.getHostName(), bindAddress.getPort(), this.webUI != null ? this.webUI.getTrackingURL() : "", this.appContext);
        for (int i = 0; i < this.taskSchedulers.length; i++) {
            this.taskSchedulerServiceWrappers[i].init(getConfig());
            this.taskSchedulerServiceWrappers[i].start();
            if (this.shouldUnregisterFlag.get()) {
                this.taskSchedulers[i].setShouldUnregister();
            }
        }
        this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") { // from class: org.apache.tez.dag.app.rm.TaskSchedulerManager.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!TaskSchedulerManager.this.stopEventHandling && !Thread.currentThread().isInterrupted()) {
                    try {
                        if (TaskSchedulerManager.this.eventQueue.peek() == null) {
                            TaskSchedulerManager.this.notifyForTest();
                        }
                        AMSchedulerEvent take = TaskSchedulerManager.this.eventQueue.take();
                        try {
                            TaskSchedulerManager.this.handleEvent(take);
                        } catch (Throwable th) {
                            TaskSchedulerManager.LOG.error("Error in handling event type " + take.getType() + " to the TaskScheduler", th);
                            TaskSchedulerManager.this.sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
                            return;
                        } finally {
                            TaskSchedulerManager.this.notifyForTest();
                        }
                    } catch (InterruptedException e) {
                        if (!TaskSchedulerManager.this.stopEventHandling) {
                            TaskSchedulerManager.LOG.warn("Continuing after interrupt : ", e);
                        }
                    }
                }
            }
        };
        this.eventHandlingThread.start();
    }

    protected void notifyForTest() {
    }

    public void initiateStop() {
        for (int i = 0; i < this.taskSchedulers.length; i++) {
            if (this.taskSchedulers[i] != null) {
                try {
                    this.taskSchedulers[i].getTaskScheduler().initiateStop();
                } catch (Exception e) {
                    LOG.error("Failed to do a clean initiateStop for Scheduler: " + Utils.getTaskSchedulerIdentifierString(i, this.appContext), e);
                }
            }
        }
    }

    public void serviceStop() throws InterruptedException {
        synchronized (this) {
            this.stopEventHandling = true;
            if (this.eventHandlingThread != null) {
                this.eventHandlingThread.interrupt();
            }
        }
        for (int i = 0; i < this.taskSchedulers.length; i++) {
            if (this.taskSchedulers[i] != null) {
                this.taskSchedulerServiceWrappers[i].stop();
            }
        }
        LOG.info("Shutting down AppCallbackExecutor");
        this.appCallbackExecutor.shutdownNow();
        this.appCallbackExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS);
    }

    public synchronized void taskAllocated(int i, Object obj, Object obj2, Container container) {
        AMSchedulerEventTALaunchRequest aMSchedulerEventTALaunchRequest = (AMSchedulerEventTALaunchRequest) obj2;
        ContainerId id = container.getId();
        if (this.appContext.getAllContainers().addContainerIfNew(container, i, aMSchedulerEventTALaunchRequest.getLauncherId(), aMSchedulerEventTALaunchRequest.getTaskCommId())) {
            this.appContext.getNodeTracker().nodeSeen(container.getNodeId(), i);
            sendEvent(new AMNodeEventContainerAllocated(container.getNodeId(), i, container.getId()));
        }
        TaskAttempt taskAttempt = aMSchedulerEventTALaunchRequest.getTaskAttempt();
        if (!$assertionsDisabled && !obj.equals(taskAttempt)) {
            throw new AssertionError();
        }
        if (this.appContext.getAllContainers().get(id).getState() == AMContainerState.ALLOCATED) {
            sendEvent(new AMContainerEventLaunchRequest(id, taskAttempt.getVertexID(), aMSchedulerEventTALaunchRequest.getContainerContext(), aMSchedulerEventTALaunchRequest.getLauncherId(), aMSchedulerEventTALaunchRequest.getTaskCommId()));
        }
        sendEvent(new AMContainerEventAssignTA(id, taskAttempt.getID(), aMSchedulerEventTALaunchRequest.getRemoteTaskSpec(), aMSchedulerEventTALaunchRequest.getContainerContext().getLocalResources(), aMSchedulerEventTALaunchRequest.getContainerContext().getCredentials(), aMSchedulerEventTALaunchRequest.getPriority()));
    }

    public synchronized void containerCompleted(int i, Object obj, ContainerStatus containerStatus) {
        AMContainer aMContainer = this.appContext.getAllContainers().get(containerStatus.getContainerId());
        if (aMContainer != null) {
            String str = "Container completed. ";
            TaskAttemptTerminationCause taskAttemptTerminationCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
            int exitStatus = containerStatus.getExitStatus();
            if (exitStatus == -102) {
                str = "Container preempted externally. ";
                taskAttemptTerminationCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
            } else if (exitStatus == -101) {
                str = "Container disk failed. ";
                taskAttemptTerminationCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
            } else if (exitStatus != 0) {
                str = "Container failed, exitCode=" + exitStatus + ". ";
            }
            if (containerStatus.getDiagnostics() != null) {
                str = str + containerStatus.getDiagnostics();
            }
            sendEvent(new AMContainerEventCompleted(aMContainer.getContainerId(), exitStatus, str, taskAttemptTerminationCause));
        }
    }

    public synchronized void containerBeingReleased(int i, ContainerId containerId) {
        if (this.appContext.getAllContainers().get(containerId) != null) {
            sendEvent(new AMContainerEventStopRequest(containerId));
        }
    }

    public synchronized void nodesUpdated(int i, List<NodeReport> list) {
        Iterator<NodeReport> it = list.iterator();
        while (it.hasNext()) {
            this.eventHandler.handle(new AMNodeEventStateChanged(it.next(), i));
        }
    }

    public synchronized void appShutdownRequested(int i) {
        LOG.info("App shutdown requested by scheduler {}", Integer.valueOf(i));
        sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
    }

    public synchronized void setApplicationRegistrationData(int i, Resource resource, Map<ApplicationAccessType, String> map, ByteBuffer byteBuffer, String str) {
        this.appContext.getClusterInfo().setMaxContainerCapability(resource);
        this.appContext.setQueueName(str);
        this.appAcls = map;
        this.clientService.setClientAMSecretKey(byteBuffer);
    }

    public TaskSchedulerContext.AppFinalStatus getFinalAppStatus() {
        FinalApplicationStatus applyFinalApplicationStatusCorrection;
        FinalApplicationStatus finalApplicationStatus = FinalApplicationStatus.UNDEFINED;
        StringBuffer stringBuffer = new StringBuffer();
        if (this.dagAppMaster == null) {
            applyFinalApplicationStatusCorrection = FinalApplicationStatus.UNDEFINED;
            stringBuffer.append("App not yet initialized");
        } else {
            DAGAppMasterState state = this.dagAppMaster.getState();
            applyFinalApplicationStatusCorrection = this.hadoopShim.applyFinalApplicationStatusCorrection(state == DAGAppMasterState.SUCCEEDED ? FinalApplicationStatus.SUCCEEDED : (state == DAGAppMasterState.KILLED || (state == DAGAppMasterState.RUNNING && this.isSignalled)) ? FinalApplicationStatus.KILLED : (state == DAGAppMasterState.FAILED || state == DAGAppMasterState.ERROR) ? FinalApplicationStatus.FAILED : FinalApplicationStatus.UNDEFINED, this.dagAppMaster.isSession(), state == DAGAppMasterState.ERROR);
            List<String> diagnostics = this.dagAppMaster.getDiagnostics();
            if (diagnostics != null) {
                Iterator<String> it = diagnostics.iterator();
                while (it.hasNext()) {
                    stringBuffer.append(it.next()).append("\n");
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Setting job diagnostics to " + stringBuffer.toString());
        }
        return new TaskSchedulerContext.AppFinalStatus(applyFinalApplicationStatusCorrection, stringBuffer.toString(), this.historyUrl);
    }

    public float getProgress(int i) {
        try {
            int clusterNodeCount = this.taskSchedulers[0].getClusterNodeCount();
            if (clusterNodeCount != this.cachedNodeCount) {
                this.cachedNodeCount = clusterNodeCount;
                sendEvent(new AMNodeEventNodeCountUpdated(this.cachedNodeCount, i));
            }
            return this.dagAppMaster.getProgress();
        } catch (Exception e) {
            String str = "Error in TaskScheduler while getting node count, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
            throw new RuntimeException(e);
        }
    }

    public void reportError(int i, ServicePluginError servicePluginError, String str, DagInfo dagInfo) {
        if (servicePluginError == YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR) {
            LOG.info("Error reported by scheduler {} - {}", Utils.getTaskSchedulerIdentifierString(i, this.appContext) + ": " + str);
            if (this.taskSchedulerDescriptors[i].getClassName().equals(YarnTaskSchedulerService.class.getName())) {
                LOG.warn("Reporting a SchedulerServiceError to the DAGAppMaster since the error was reported by the default YARN Task Scheduler");
                sendEvent(new DAGAppMasterEventSchedulingServiceError(str));
                return;
            }
            return;
        }
        if (servicePluginError.getErrorType() != ServicePluginError.ErrorType.PERMANENT) {
            Utils.processNonFatalServiceErrorReport(Utils.getTaskSchedulerIdentifierString(i, this.appContext), servicePluginError, str, dagInfo, this.appContext, "TaskScheduler");
            return;
        }
        String str2 = "Fatal error reported by TaskScheduler, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext) + ", servicePluginError=" + servicePluginError + ", diagnostics= " + (str == null ? "" : str);
        LOG.error(str2);
        sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str2, null));
    }

    public void dagCompleted() {
        for (int i = 0; i < this.taskSchedulers.length; i++) {
            try {
                this.taskSchedulers[i].dagComplete();
            } catch (Exception e) {
                String str = "Error in TaskScheduler when notified for Dag Completion, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
                LOG.error(str, e);
                sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
            }
        }
    }

    public void dagSubmitted() {
    }

    public void preemptContainer(int i, ContainerId containerId) {
        AMContainer aMContainer = this.appContext.getAllContainers().get(containerId);
        try {
            this.taskSchedulers[aMContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
        } catch (Exception e) {
            String str = "Error in TaskScheduler when preempting container, scheduler=" + Utils.getTaskSchedulerIdentifierString(aMContainer.getTaskSchedulerIdentifier(), this.appContext) + ", containerId=" + containerId;
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
        }
        sendEvent(new AMContainerEventCompleted(containerId, -1000, "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
    }

    public void setShouldUnregisterFlag() {
        LOG.info("TaskScheduler notified that it should unregister from RM");
        this.shouldUnregisterFlag.set(true);
        for (int i = 0; i < this.taskSchedulers.length; i++) {
            if (this.taskSchedulers[i] != null) {
                try {
                    this.taskSchedulers[i].setShouldUnregister();
                } catch (Exception e) {
                    String str = "Error in TaskScheduler when setting Unregister Flag, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
                    LOG.error(str, e);
                    sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
                }
            }
        }
    }

    public ContainerSignatureMatcher getContainerSignatureMatcher() {
        return this.containerSignatureMatcher;
    }

    public boolean hasUnregistered() {
        if (this.taskSchedulers.length == 0) {
            return false;
        }
        boolean z = true;
        for (int i = 0; i < this.taskSchedulers.length; i++) {
            if (this.taskSchedulers[i] == null) {
                return false;
            }
            try {
                z &= this.taskSchedulers[i].hasUnregistered();
            } catch (Exception e) {
                z = false;
                String str = "Error in TaskScheduler when checking if a scheduler has unregistered, scheduler=" + Utils.getTaskSchedulerIdentifierString(i, this.appContext);
                LOG.error(str, e);
                sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR, str, e));
            }
            if (!z) {
                return z;
            }
        }
        return z;
    }

    @VisibleForTesting
    public String getHistoryUrl() {
        Configuration aMConf = this.appContext.getAMConf();
        String str = "";
        String str2 = aMConf.get("tez.am.tez-ui.history-url.template", "__HISTORY_URL_BASE__/#/tez-app/__APPLICATION_ID__");
        String str3 = aMConf.get("tez.tez-ui.history-url.base", "");
        if (!str2.isEmpty() && !str3.isEmpty()) {
            str = str2.replaceAll(APPLICATION_ID_PLACEHOLDER, this.appContext.getApplicationID().toString()).replaceAll(HISTORY_URL_BASE, str3).replaceAll("([^:])/{2,}", "$1/");
            if (aMConf.getBoolean("tez.am.ui.history.url.scheme.check.enabled", true) && !str.startsWith("http")) {
                str = "http://" + str;
            }
        }
        return str;
    }

    public String getTaskSchedulerClassName(int i) {
        return this.taskSchedulers[i].getTaskScheduler().getClass().getName();
    }

    static {
        $assertionsDisabled = !TaskSchedulerManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(TaskSchedulerManager.class);
    }
}
