package org.apache.hadoop.yarn.event.multidispatcher;

import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.metrics.DispatcherEventMetrics;
import org.apache.hadoop.yarn.metrics.DispatcherEventMetricsImpl;
import org.apache.hadoop.yarn.metrics.DispatcherEventMetricsNoOps;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/hadoop-yarn-common-3.4.1.0-eep-940.jar:org/apache/hadoop/yarn/event/multidispatcher/MultiDispatcher.class */
public class MultiDispatcher extends AbstractService implements Dispatcher {
    private final Logger log;
    private final String dispatcherName;
    private final MultiDispatcherLibrary library;
    private final Clock clock;
    private MultiDispatcherExecutor workerExecutor;
    private ScheduledThreadPoolExecutor monitorExecutor;
    private DispatcherEventMetrics metrics;

    public MultiDispatcher(String str) {
        super("Dispatcher");
        this.clock = new MonotonicClock();
        this.dispatcherName = str.replaceAll(" ", HelpFormatter.DEFAULT_OPT_PREFIX).toLowerCase();
        this.log = LoggerFactory.getLogger(MultiDispatcher.class.getCanonicalName() + "." + this.dispatcherName);
        this.library = new MultiDispatcherLibrary();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        super.serviceInit(configuration);
        MultiDispatcherConfig multiDispatcherConfig = new MultiDispatcherConfig(getConfig(), this.dispatcherName);
        this.workerExecutor = new MultiDispatcherExecutor(this.log, multiDispatcherConfig, this.dispatcherName);
        this.workerExecutor.start();
        createMonitorThread(multiDispatcherConfig);
        if (!multiDispatcherConfig.getMetricsEnabled()) {
            this.metrics = new DispatcherEventMetricsNoOps(this.log);
        } else {
            this.metrics = new DispatcherEventMetricsImpl(this.dispatcherName);
            DefaultMetricsSystem.instance().register("Event metrics for " + this.dispatcherName, "Event metrics for " + this.dispatcherName, (String) this.metrics);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        if (this.monitorExecutor != null) {
            this.monitorExecutor.shutdown();
        }
        this.workerExecutor.stop();
    }

    @Override // org.apache.hadoop.yarn.event.Dispatcher
    public EventHandler getEventHandler() {
        return event -> {
            if (isInState(Service.STATE.STOPPED)) {
                this.log.warn("Discard event {} because stopped state", event);
                return;
            }
            this.workerExecutor.execute(event, createRunnable(event, this.library.getEventHandler(event)));
            this.metrics.addEvent(event.getType());
        };
    }

    @Override // org.apache.hadoop.yarn.event.Dispatcher
    public void register(Class<? extends Enum> cls, EventHandler eventHandler) {
        this.library.register(cls, eventHandler);
        this.metrics.init(cls);
    }

    private Runnable createRunnable(Event event, EventHandler eventHandler) {
        return () -> {
            long time = this.clock.getTime();
            try {
                eventHandler.handle(event);
                this.metrics.updateRate(event.getType(), this.clock.getTime() - time);
                this.metrics.removeEvent(event.getType());
            } catch (Throwable th) {
                this.metrics.updateRate(event.getType(), this.clock.getTime() - time);
                this.metrics.removeEvent(event.getType());
                throw th;
            }
        };
    }

    private void createMonitorThread(MultiDispatcherConfig multiDispatcherConfig) {
        int monitorSeconds = multiDispatcherConfig.getMonitorSeconds();
        if (monitorSeconds < 1) {
            return;
        }
        this.monitorExecutor = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern(this.dispatcherName + "-monitor-%d").build());
        this.monitorExecutor.scheduleAtFixedRate(() -> {
            List list = (List) this.workerExecutor.getQueuesSize().entrySet().stream().filter(entry -> {
                return 0 < ((Long) entry.getValue()).longValue();
            }).map(entry2 -> {
                return String.format("%s has queue size %d", entry2.getKey(), entry2.getValue());
            }).sorted().collect(Collectors.toList());
            if (!list.isEmpty()) {
                this.log.info("Event queue sizes: {}", list);
            }
            this.log.debug("Metrics: {}", this.metrics);
        }, 10L, monitorSeconds, TimeUnit.SECONDS);
    }
}
