package org.apache.tez.dag.history;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.HistoryLogLevel;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/history/HistoryEventHandler.class */
public class HistoryEventHandler extends CompositeService {
    private static Logger LOG = LoggerFactory.getLogger(HistoryEventHandler.class);
    private static Logger LOG_CRITICAL_EVENTS = LoggerFactory.getLogger(LOG.getName() + ".criticalEvents");
    private final AppContext context;
    private RecoveryService recoveryService;
    private boolean recoveryEnabled;
    private HistoryLoggingService historyLoggingService;
    private HistoryLogLevel amHistoryLogLevel;
    private final Map<TezDAGID, HistoryLogLevel> dagIdToLogLevel;
    private Set<TaskAttemptTerminationCause> amTaskAttemptFilters;
    private final Map<TezDAGID, Set<TaskAttemptTerminationCause>> dagIdToTaskAttemptFilters;
    private final ConcurrentHashMap<TezTaskAttemptID, DAGHistoryEvent> suppressedEvents;
    private final AtomicLong criticalEventCount;

    public HistoryEventHandler(AppContext appContext) {
        super(HistoryEventHandler.class.getName());
        this.dagIdToLogLevel = new ConcurrentHashMap();
        this.dagIdToTaskAttemptFilters = new ConcurrentHashMap();
        this.suppressedEvents = new ConcurrentHashMap<>();
        this.criticalEventCount = new AtomicLong();
        this.context = appContext;
    }

    public void serviceInit(Configuration configuration) throws Exception {
        this.recoveryEnabled = this.context.getAMConf().getBoolean("tez.dag.recovery.enabled", true);
        String str = this.context.getAMConf().get("tez.history.logging.service.class", "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService");
        LOG.info("Initializing HistoryEventHandler withrecoveryEnabled=" + this.recoveryEnabled + ", historyServiceClassName=" + str);
        this.historyLoggingService = (HistoryLoggingService) ReflectionUtils.createClazzInstance(str);
        this.historyLoggingService.setAppContext(this.context);
        addService(this.historyLoggingService);
        if (this.recoveryEnabled) {
            this.recoveryService = (RecoveryService) ReflectionUtils.createClazzInstance(configuration.get("tez.test.recovery-service-class", "org.apache.tez.dag.history.recovery.RecoveryService"), new Class[]{AppContext.class}, new Object[]{this.context});
            addService(this.recoveryService);
        }
        this.amHistoryLogLevel = HistoryLogLevel.getLogLevel(this.context.getAMConf(), HistoryLogLevel.DEFAULT);
        this.amTaskAttemptFilters = TezUtilsInternal.getEnums(this.context.getAMConf(), "tez.history.logging.taskattempt-filters", TaskAttemptTerminationCause.class, (String) null);
        super.serviceInit(configuration);
    }

    public void serviceStart() throws Exception {
        super.serviceStart();
    }

    public void serviceStop() throws Exception {
        LOG.info("Stopping HistoryEventHandler");
        super.serviceStop();
    }

    public void handleCriticalEvent(DAGHistoryEvent dAGHistoryEvent) throws IOException {
        TezDAGID dagid = dAGHistoryEvent.getDAGID();
        String tezDAGID = dagid != null ? dagid.toString() : "N/A";
        HistoryEvent historyEvent = dAGHistoryEvent.getHistoryEvent();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Handling history event, eventType=" + historyEvent.getEventType());
        }
        if (this.recoveryEnabled && historyEvent.isRecoveryEvent()) {
            this.recoveryService.handle(dAGHistoryEvent);
        }
        if (historyEvent.isHistoryEvent() && shouldLogEvent(dAGHistoryEvent)) {
            DAGHistoryEvent supressedEvent = getSupressedEvent(historyEvent);
            if (supressedEvent != null) {
                this.historyLoggingService.handle(supressedEvent);
            }
            this.historyLoggingService.handle(dAGHistoryEvent);
        }
        if (LOG_CRITICAL_EVENTS.isInfoEnabled()) {
            LOG_CRITICAL_EVENTS.info("[HISTORY][DAG:" + tezDAGID + "][Event:" + dAGHistoryEvent.getHistoryEvent().getEventType().name() + "]: " + dAGHistoryEvent.getHistoryEvent().toString());
        } else if (this.criticalEventCount.incrementAndGet() % 1000 == 0) {
            LOG.info("Got {} critical events", this.criticalEventCount);
        }
    }

    private boolean shouldLogEvent(DAGHistoryEvent dAGHistoryEvent) {
        TezDAGID dagid = dAGHistoryEvent.getDAGID();
        HistoryLogLevel historyLogLevel = null;
        if (dagid != null) {
            historyLogLevel = this.dagIdToLogLevel.get(dagid);
        }
        if (historyLogLevel == null) {
            historyLogLevel = this.amHistoryLogLevel;
        }
        HistoryEvent historyEvent = dAGHistoryEvent.getHistoryEvent();
        HistoryEventType eventType = historyEvent.getEventType();
        if (eventType == HistoryEventType.DAG_SUBMITTED) {
            Configuration conf = ((DAGSubmittedEvent) historyEvent).getConf();
            historyLogLevel = HistoryLogLevel.getLogLevel(conf, this.amHistoryLogLevel);
            this.dagIdToLogLevel.put(dagid, historyLogLevel);
            maybeUpdateDagTaskAttemptFilters(dagid, historyLogLevel, conf);
        } else if (eventType == HistoryEventType.DAG_RECOVERED) {
            if (this.context.getCurrentDAG() != null) {
                Configuration conf2 = this.context.getCurrentDAG().getConf();
                historyLogLevel = HistoryLogLevel.getLogLevel(conf2, this.amHistoryLogLevel);
                this.dagIdToLogLevel.put(dagid, historyLogLevel);
                maybeUpdateDagTaskAttemptFilters(dagid, historyLogLevel, conf2);
            }
        } else if (eventType == HistoryEventType.DAG_FINISHED) {
            this.dagIdToLogLevel.remove(dagid);
            this.dagIdToTaskAttemptFilters.remove(dagid);
            this.suppressedEvents.clear();
        }
        if (historyLogLevel.shouldLog(historyEvent.getEventType().getHistoryLogLevel())) {
            return shouldLogTaskAttemptEvents(dAGHistoryEvent, historyLogLevel);
        }
        return false;
    }

    private boolean shouldLogTaskAttemptEvents(DAGHistoryEvent dAGHistoryEvent, HistoryLogLevel historyLogLevel) {
        HistoryEvent historyEvent = dAGHistoryEvent.getHistoryEvent();
        HistoryEventType eventType = historyEvent.getEventType();
        if (historyLogLevel != HistoryLogLevel.TASK_ATTEMPT) {
            return true;
        }
        if (eventType != HistoryEventType.TASK_ATTEMPT_STARTED && eventType != HistoryEventType.TASK_ATTEMPT_FINISHED) {
            return true;
        }
        TezDAGID dagid = dAGHistoryEvent.getDAGID();
        Set<TaskAttemptTerminationCause> set = null;
        if (dagid != null) {
            set = this.dagIdToTaskAttemptFilters.get(dagid);
        }
        if (set == null) {
            set = this.amTaskAttemptFilters;
        }
        if (set == null) {
            return true;
        }
        if (eventType == HistoryEventType.TASK_ATTEMPT_STARTED) {
            this.suppressedEvents.put(((TaskAttemptStartedEvent) historyEvent).getTaskAttemptID(), dAGHistoryEvent);
            return false;
        }
        TaskAttemptFinishedEvent taskAttemptFinishedEvent = (TaskAttemptFinishedEvent) historyEvent;
        if (!set.contains(taskAttemptFinishedEvent.getTaskAttemptError())) {
            return true;
        }
        this.suppressedEvents.remove(taskAttemptFinishedEvent.getTaskAttemptID());
        return false;
    }

    private void maybeUpdateDagTaskAttemptFilters(TezDAGID tezDAGID, HistoryLogLevel historyLogLevel, Configuration configuration) {
        Set<TaskAttemptTerminationCause> enums;
        if (historyLogLevel != HistoryLogLevel.TASK_ATTEMPT || (enums = TezUtilsInternal.getEnums(configuration, "tez.history.logging.taskattempt-filters", TaskAttemptTerminationCause.class, (String) null)) == null) {
            return;
        }
        this.dagIdToTaskAttemptFilters.put(tezDAGID, enums);
    }

    private DAGHistoryEvent getSupressedEvent(HistoryEvent historyEvent) {
        if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
            return this.suppressedEvents.remove(((TaskAttemptFinishedEvent) historyEvent).getTaskAttemptID());
        }
        return null;
    }

    public void handle(DAGHistoryEvent dAGHistoryEvent) {
        try {
            handleCriticalEvent(dAGHistoryEvent);
        } catch (IOException e) {
            LOG.warn("Failed to handle recovery event, eventType=" + dAGHistoryEvent.getHistoryEvent().getEventType(), e);
        }
    }

    public boolean hasRecoveryFailed() {
        if (this.recoveryEnabled) {
            return this.recoveryService.hasRecoveryFailed();
        }
        return false;
    }
}
