package org.apache.tez.dag.app;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.Utils;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/tez/dag/app/TaskCommunicatorManager.class */
public class TaskCommunicatorManager extends AbstractService implements TaskCommunicatorManagerInterface {
    private final AppContext context;
    private final TaskCommunicatorWrapper[] taskCommunicators;
    private final TaskCommunicatorContext[] taskCommunicatorContexts;
    protected final ServicePluginLifecycleAbstractService[] taskCommunicatorServiceWrappers;
    protected final TaskHeartbeatHandler taskHeartbeatHandler;
    protected final ContainerHeartbeatHandler containerHeartbeatHandler;
    private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE;
    private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts;
    private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers;
    private static final Logger LOG = LoggerFactory.getLogger(TaskCommunicatorManager.class);
    private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);

    /* renamed from: org.apache.tez.dag.app.TaskCommunicatorManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/tez/dag/app/TaskCommunicatorManager$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$tez$runtime$api$impl$EventType = new int[EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventType[EventType.TASK_ATTEMPT_FAILED_EVENT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventType[EventType.TASK_ATTEMPT_KILLED_EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventType[EventType.TASK_ATTEMPT_COMPLETED_EVENT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType = new int[EventMetaData.EventProducerConsumerType.values().length];
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.INPUT.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.PROCESSOR.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.OUTPUT.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[EventMetaData.EventProducerConsumerType.SYSTEM.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/TaskCommunicatorManager$ContainerInfo.class */
    private static final class ContainerInfo {
        TezTaskAttemptID taskAttemptId;

        ContainerInfo(TezTaskAttemptID tezTaskAttemptID) {
            this.taskAttemptId = tezTaskAttemptID;
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public TaskCommunicatorManager(TaskCommunicator taskCommunicator, AppContext appContext, TaskHeartbeatHandler taskHeartbeatHandler, ContainerHeartbeatHandler containerHeartbeatHandler) {
        super(TaskCommunicatorManager.class.getName());
        this.RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
        this.registeredAttempts = new ConcurrentHashMap();
        this.registeredContainers = new ConcurrentHashMap();
        this.context = appContext;
        this.taskHeartbeatHandler = taskHeartbeatHandler;
        this.containerHeartbeatHandler = containerHeartbeatHandler;
        this.taskCommunicators = new TaskCommunicatorWrapper[]{new TaskCommunicatorWrapper(taskCommunicator)};
        this.taskCommunicatorContexts = new TaskCommunicatorContext[]{taskCommunicator.getContext()};
        this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[]{new ServicePluginLifecycleAbstractService(taskCommunicator)};
    }

    public TaskCommunicatorManager(AppContext appContext, TaskHeartbeatHandler taskHeartbeatHandler, ContainerHeartbeatHandler containerHeartbeatHandler, List<NamedEntityDescriptor> list) throws TezException {
        super(TaskCommunicatorManager.class.getName());
        this.RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
        this.registeredAttempts = new ConcurrentHashMap();
        this.registeredContainers = new ConcurrentHashMap();
        this.context = appContext;
        this.taskHeartbeatHandler = taskHeartbeatHandler;
        this.containerHeartbeatHandler = containerHeartbeatHandler;
        if (list == null || list.isEmpty()) {
            throw new IllegalArgumentException("TaskCommunicators must be specified");
        }
        this.taskCommunicators = new TaskCommunicatorWrapper[list.size()];
        this.taskCommunicatorContexts = new TaskCommunicatorContext[list.size()];
        this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[list.size()];
        for (int i = 0; i < list.size(); i++) {
            this.taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(appContext, this, list.get(i).getUserPayload(), i);
            this.taskCommunicators[i] = new TaskCommunicatorWrapper(createTaskCommunicator(list.get(i), i));
            this.taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService(this.taskCommunicators[i].getTaskCommunicator());
        }
    }

    public void serviceStart() {
        for (int i = 0; i < this.taskCommunicators.length; i++) {
            this.taskCommunicatorServiceWrappers[i].init(getConfig());
            this.taskCommunicatorServiceWrappers[i].start();
        }
    }

    public void serviceStop() {
        for (int i = 0; i < this.taskCommunicators.length; i++) {
            this.taskCommunicatorServiceWrappers[i].stop();
        }
    }

    @VisibleForTesting
    TaskCommunicator createTaskCommunicator(NamedEntityDescriptor namedEntityDescriptor, int i) throws TezException {
        return namedEntityDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName()) ? createDefaultTaskCommunicator(this.taskCommunicatorContexts[i]) : namedEntityDescriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName()) ? createUberTaskCommunicator(this.taskCommunicatorContexts[i]) : createCustomTaskCommunicator(this.taskCommunicatorContexts[i], namedEntityDescriptor);
    }

    @VisibleForTesting
    TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        LOG.info("Creating Default Task Communicator");
        return new TezTaskCommunicatorImpl(taskCommunicatorContext);
    }

    @VisibleForTesting
    TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        LOG.info("Creating Default Local Task Communicator");
        return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
    }

    @VisibleForTesting
    TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, NamedEntityDescriptor namedEntityDescriptor) throws TezException {
        LOG.info("Creating TaskCommunicator {}:{} " + namedEntityDescriptor.getEntityName(), namedEntityDescriptor.getClassName());
        try {
            return (TaskCommunicator) ReflectionUtils.getClazz(namedEntityDescriptor.getClassName()).getConstructor(TaskCommunicatorContext.class).newInstance(taskCommunicatorContext);
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TezUncheckedException(e);
        }
    }

    public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest taskHeartbeatRequest) throws IOException, TezException {
        TaskAttemptTerminationCause taskAttemptTerminationCause;
        ContainerId containerId = ConverterUtils.toContainerId(taskHeartbeatRequest.getContainerIdentifier());
        LOG.debug("Received heartbeat from container, request={}", taskHeartbeatRequest);
        if (!this.registeredContainers.containsKey(containerId)) {
            LOG.warn("Received task heartbeat from unknown container with id: " + containerId + ", asking it to die");
            return this.RESPONSE_SHOULD_DIE;
        }
        pingContainerHeartbeatHandler(containerId);
        TaskAttemptEventInfo taskAttemptEventInfo = new TaskAttemptEventInfo(0, null, 0);
        TezTaskAttemptID taskAttemptId = taskHeartbeatRequest.getTaskAttemptId();
        if (taskAttemptId != null) {
            ContainerId containerId2 = this.registeredAttempts.get(taskAttemptId);
            if (containerId2 == null || !containerId2.equals(containerId)) {
                LOG.info("Attempt: " + taskAttemptId + " is not recognized for heartbeats");
                return this.RESPONSE_SHOULD_DIE;
            }
            List<TezEvent> events = taskHeartbeatRequest.getEvents();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ping from " + taskAttemptId.toString() + " events: " + (events != null ? events.size() : -1));
            }
            long time = this.context.getClock().getTime();
            ArrayList<TezEvent> arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            TaskAttemptEventStatusUpdate taskAttemptEventStatusUpdate = null;
            boolean z = false;
            for (TezEvent tezEvent : ListUtils.emptyIfNull(events)) {
                tezEvent.setEventReceivedTime(time);
                EventType eventType = tezEvent.getEventType();
                if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
                    taskAttemptEventStatusUpdate = new TaskAttemptEventStatusUpdate(taskAttemptId, tezEvent.getEvent());
                } else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT || eventType == EventType.TASK_ATTEMPT_KILLED_EVENT) {
                    arrayList.add(tezEvent);
                } else {
                    if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
                        z = true;
                    }
                    if (eventType == EventType.DATA_MOVEMENT_EVENT || eventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT || eventType == EventType.ROOT_INPUT_INITIALIZER_EVENT || eventType == EventType.VERTEX_MANAGER_EVENT) {
                        arrayList2.add(tezEvent);
                    }
                    arrayList3.add(tezEvent);
                }
            }
            if (taskAttemptEventStatusUpdate != null) {
                taskAttemptEventStatusUpdate.setReadErrorReported(z);
                sendEvent(taskAttemptEventStatusUpdate);
            }
            if (!arrayList2.isEmpty()) {
                sendEvent(new TaskAttemptEventTezEventUpdate(taskAttemptId, arrayList2));
            }
            Preconditions.checkArgument(arrayList.size() <= 1, "Multiple TaskAttemptFinishedEvent");
            for (TezEvent tezEvent2 : arrayList) {
                EventMetaData sourceInfo = tezEvent2.getSourceInfo();
                switch (AnonymousClass1.$SwitchMap$org$apache$tez$runtime$api$impl$EventType[tezEvent2.getEventType().ordinal()]) {
                    case 1:
                    case 2:
                        switch (AnonymousClass1.$SwitchMap$org$apache$tez$runtime$api$impl$EventMetaData$EventProducerConsumerType[sourceInfo.getEventGenerator().ordinal()]) {
                            case 1:
                                taskAttemptTerminationCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
                                break;
                            case 2:
                                taskAttemptTerminationCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
                                break;
                            case 3:
                                taskAttemptTerminationCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
                                break;
                            case 4:
                                taskAttemptTerminationCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
                                break;
                            default:
                                throw new TezUncheckedException("Unknown EventProducerConsumerType: " + sourceInfo.getEventGenerator());
                        }
                        if (tezEvent2.getEventType() == EventType.TASK_ATTEMPT_FAILED_EVENT) {
                            TaskAttemptFailedEvent event = tezEvent2.getEvent();
                            sendEvent(new TaskAttemptEventAttemptFailed(sourceInfo.getTaskAttemptID(), TaskAttemptEventType.TA_FAILED, event.getTaskFailureType(), "Error: " + event.getDiagnostics(), taskAttemptTerminationCause));
                            break;
                        } else {
                            sendEvent(new TaskAttemptEventAttemptKilled(sourceInfo.getTaskAttemptID(), "Error: " + tezEvent2.getEvent().getDiagnostics(), taskAttemptTerminationCause));
                            break;
                        }
                    case 3:
                        sendEvent(new TaskAttemptEvent(sourceInfo.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
                        break;
                    default:
                        throw new TezUncheckedException("Unhandled tez event type: " + tezEvent2.getEventType());
                }
            }
            if (!arrayList3.isEmpty()) {
                sendEvent(new VertexEventRouteEvent(taskAttemptId.getVertexID(), Collections.unmodifiableList(arrayList3)));
            }
            this.taskHeartbeatHandler.pinged(taskAttemptId);
            taskAttemptEventInfo = this.context.getCurrentDAG().getVertex(taskAttemptId.getVertexID()).getTaskAttemptTezEvents(taskAttemptId, taskHeartbeatRequest.getStartIndex(), taskHeartbeatRequest.getPreRoutedStartIndex(), taskHeartbeatRequest.getMaxEvents());
        }
        return new TaskHeartbeatResponse(false, taskAttemptEventInfo.getEvents(), taskAttemptEventInfo.getNextFromEventId(), taskAttemptEventInfo.getNextPreRoutedFromEventId());
    }

    public void taskAlive(TezTaskAttemptID tezTaskAttemptID) {
        this.taskHeartbeatHandler.pinged(tezTaskAttemptID);
    }

    public void containerAlive(ContainerId containerId) {
        pingContainerHeartbeatHandler(containerId);
    }

    public void taskSubmitted(TezTaskAttemptID tezTaskAttemptID, ContainerId containerId) {
        sendEvent(new TaskAttemptEventSubmitted(tezTaskAttemptID, containerId));
        pingContainerHeartbeatHandler(containerId);
    }

    public void taskStartedRemotely(TezTaskAttemptID tezTaskAttemptID) {
        sendEvent(new TaskAttemptEventStartedRemotely(tezTaskAttemptID));
    }

    public void taskKilled(TezTaskAttemptID tezTaskAttemptID, TaskAttemptEndReason taskAttemptEndReason, String str) {
        sendEvent(new TaskAttemptEventAttemptKilled(tezTaskAttemptID, str, TezUtilsInternal.fromTaskAttemptEndReason(taskAttemptEndReason)));
    }

    public void taskFailed(TezTaskAttemptID tezTaskAttemptID, TaskFailureType taskFailureType, TaskAttemptEndReason taskAttemptEndReason, String str) {
        sendEvent(new TaskAttemptEventAttemptFailed(tezTaskAttemptID, TaskAttemptEventType.TA_FAILED, taskFailureType, str, TezUtilsInternal.fromTaskAttemptEndReason(taskAttemptEndReason)));
    }

    public void vertexStateUpdateNotificationReceived(VertexStateUpdate vertexStateUpdate, int i) {
        try {
            this.taskCommunicators[i].onVertexStateUpdated(vertexStateUpdate);
        } catch (Exception e) {
            String str = "Error in TaskCommunicator when handling vertex state update notification, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", vertexName=" + vertexStateUpdate.getVertexName() + ", vertexState=" + vertexStateUpdate.getVertexState();
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, str, e));
        }
    }

    public boolean canCommit(TezTaskAttemptID tezTaskAttemptID) throws IOException {
        this.taskHeartbeatHandler.progressing(tezTaskAttemptID);
        pingContainerHeartbeatHandler(tezTaskAttemptID);
        return this.context.getCurrentDAG().getVertex(tezTaskAttemptID.getVertexID()).getTask(tezTaskAttemptID.getTaskID()).canCommit(tezTaskAttemptID);
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void dagComplete(DAG dag) {
        for (int i = 0; i < this.taskCommunicators.length; i++) {
            try {
                ((TaskCommunicatorContextImpl) this.taskCommunicatorContexts[i]).dagCompleteStart(dag);
                this.taskCommunicators[i].dagComplete(dag.getID().getId());
                ((TaskCommunicatorContextImpl) this.taskCommunicatorContexts[i]).dagCompleteEnd();
            } catch (Exception e) {
                String str = "Error in TaskCommunicator when notifying for DAG completion, communicator=" + Utils.getTaskCommIdentifierString(i, this.context);
                LOG.error(str, e);
                sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, str, e));
            }
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void dagSubmitted() {
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void registerRunningContainer(ContainerId containerId, int i) {
        LOG.debug("ContainerId: {} registered with TaskAttemptListener", containerId);
        if (this.registeredContainers.put(containerId, NULL_CONTAINER_INFO) != null) {
            throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
        }
        NodeId nodeId = this.context.getAllContainers().get(containerId).getContainer().getNodeId();
        try {
            this.taskCommunicators[i].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
        } catch (Exception e) {
            String str = "Error in TaskCommunicator when registering running Container, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", containerId=" + containerId + ", nodeId=" + nodeId;
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, str, e));
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void unregisterRunningContainer(ContainerId containerId, int i, ContainerEndReason containerEndReason, String str) {
        LOG.debug("Unregistering Container from TaskAttemptListener: {}", containerId);
        ContainerInfo remove = this.registeredContainers.remove(containerId);
        if (remove.taskAttemptId != null) {
            this.registeredAttempts.remove(remove.taskAttemptId);
        }
        try {
            this.taskCommunicators[i].registerContainerEnd(containerId, containerEndReason, str);
        } catch (Exception e) {
            String str2 = "Error in TaskCommunicator when unregistering Container, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", containerId=" + containerId;
            LOG.error(str2, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, str2, e));
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void registerTaskAttempt(AMContainerTask aMContainerTask, ContainerId containerId, int i) {
        ContainerInfo containerInfo = this.registeredContainers.get(containerId);
        if (containerInfo == null) {
            throw new TezUncheckedException("Registering task attempt: " + aMContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
        }
        if (containerInfo.taskAttemptId != null) {
            throw new TezUncheckedException("Registering task attempt: " + aMContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " with existing assignment to: " + containerInfo.taskAttemptId);
        }
        this.registeredContainers.put(containerId, new ContainerInfo(aMContainerTask.getTask().getTaskAttemptID()));
        ContainerId put = this.registeredAttempts.put(aMContainerTask.getTask().getTaskAttemptID(), containerId);
        if (put != null) {
            throw new TezUncheckedException("Registering task attempt: " + aMContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " when already assigned to: " + put);
        }
        try {
            this.taskCommunicators[i].registerRunningTaskAttempt(containerId, aMContainerTask.getTask(), aMContainerTask.getAdditionalResources(), aMContainerTask.getCredentials(), aMContainerTask.haveCredentialsChanged(), aMContainerTask.getPriority());
        } catch (Exception e) {
            String str = "Error in TaskCommunicator when registering Task Attempt, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", containerId=" + containerId + ", taskId=" + aMContainerTask.getTask().getTaskAttemptID();
            LOG.error(str, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, str, e));
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void unregisterTaskAttempt(TezTaskAttemptID tezTaskAttemptID, int i, TaskAttemptEndReason taskAttemptEndReason, String str) {
        ContainerId remove = this.registeredAttempts.remove(tezTaskAttemptID);
        if (remove == null) {
            LOG.warn("Unregister task attempt: " + tezTaskAttemptID + " from unknown container");
            return;
        }
        if (this.registeredContainers.get(remove) == null) {
            LOG.warn("Unregister task attempt: " + tezTaskAttemptID + " from non-registered container: " + remove);
            return;
        }
        this.registeredContainers.put(remove, NULL_CONTAINER_INFO);
        try {
            this.taskCommunicators[i].unregisterRunningTaskAttempt(tezTaskAttemptID, taskAttemptEndReason, str);
        } catch (Exception e) {
            String str2 = "Error in TaskCommunicator when unregistering Task Attempt, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", containerId=" + remove + ", taskId=" + tezTaskAttemptID;
            LOG.error(str2, e);
            sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, str2, e));
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public TaskCommunicatorWrapper getTaskCommunicator(int i) {
        return this.taskCommunicators[i];
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public void reportError(int i, ServicePluginError servicePluginError, String str, DagInfo dagInfo) {
        if (servicePluginError.getErrorType() != ServicePluginError.ErrorType.PERMANENT) {
            Utils.processNonFatalServiceErrorReport(Utils.getTaskCommIdentifierString(i, this.context), servicePluginError, str, dagInfo, this.context, "TaskCommunicator");
            return;
        }
        String str2 = "Fatal Error reported by TaskCommunicator, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", servicePluginError=" + servicePluginError + ", diagnostics= " + (str == null ? "" : str);
        LOG.error(str2 + ", Diagnostics=" + str);
        sendEvent(new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, str2, null));
    }

    private void pingContainerHeartbeatHandler(ContainerId containerId) {
        this.containerHeartbeatHandler.pinged(containerId);
    }

    private void pingContainerHeartbeatHandler(TezTaskAttemptID tezTaskAttemptID) {
        ContainerId containerId = this.registeredAttempts.get(tezTaskAttemptID);
        if (containerId != null) {
            this.containerHeartbeatHandler.pinged(containerId);
        } else {
            LOG.warn("Handling communication from attempt: " + tezTaskAttemptID + ", ContainerId not known for this attempt");
        }
    }

    private void sendEvent(Event<?> event) {
        this.context.getEventHandler().handle(event);
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public String getTaskCommunicatorClassName(int i) {
        return this.taskCommunicators[i].getTaskCommunicator().getClass().getName();
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public String getInProgressLogsUrl(int i, TezTaskAttemptID tezTaskAttemptID, NodeId nodeId) {
        try {
            return this.taskCommunicators[i].getInProgressLogsUrl(tezTaskAttemptID, nodeId);
        } catch (Exception e) {
            LOG.warn("Failed to retrieve InProgressLogsUrl from TaskCommunicator,, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", attemptId=" + tezTaskAttemptID, e);
            return null;
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public String getCompletedLogsUrl(int i, TezTaskAttemptID tezTaskAttemptID, NodeId nodeId) {
        try {
            return this.taskCommunicators[i].getCompletedLogsUrl(tezTaskAttemptID, nodeId);
        } catch (Exception e) {
            LOG.warn("Failed to retrieve CompletedLogsUrl from TaskCommunicator,, communicator=" + Utils.getTaskCommIdentifierString(i, this.context) + ", attemptId=" + tezTaskAttemptID, e);
            return null;
        }
    }

    @Override // org.apache.tez.dag.app.TaskCommunicatorManagerInterface
    public long getTotalUsedMemory() {
        long j = 0;
        for (int i = 0; i < this.taskCommunicators.length; i++) {
            j += this.taskCommunicators[i].getTaskCommunicator().getTotalUsedMemory();
        }
        return j;
    }
}
