package org.apache.hadoop.yarn.event;

import com.google.common.annotations.VisibleForTesting;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
/* loaded from: input_file:WEB-INF/lib/hadoop-yarn-common-2.7.6.500-eep-813.jar:org/apache/hadoop/yarn/event/AsyncDispatcher.class */
public class AsyncDispatcher extends AbstractService implements Dispatcher {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AsyncDispatcher.class);
    private static final Marker FATAL = MarkerFactory.getMarker("FATAL");
    private final BlockingQueue<Event> eventQueue;
    private volatile boolean stopped;
    private volatile boolean drainEventsOnStop;
    private volatile boolean drained;
    private Object waitForDrained;
    private volatile boolean blockNewEvents;
    private final EventHandler handlerInstance;
    private Thread eventHandlingThread;
    protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
    private boolean exitOnDispatchException;

    /* loaded from: input_file:WEB-INF/lib/hadoop-yarn-common-2.7.6.500-eep-813.jar:org/apache/hadoop/yarn/event/AsyncDispatcher$GenericEventHandler.class */
    class GenericEventHandler implements EventHandler<Event> {
        GenericEventHandler() {
        }

        @Override // org.apache.hadoop.yarn.event.EventHandler
        public void handle(Event event) {
            if (AsyncDispatcher.this.blockNewEvents) {
                return;
            }
            AsyncDispatcher.this.drained = false;
            int size = AsyncDispatcher.this.eventQueue.size();
            if (size != 0 && size % 1000 == 0) {
                AsyncDispatcher.LOG.info("Size of event-queue is " + size);
            }
            int remainingCapacity = AsyncDispatcher.this.eventQueue.remainingCapacity();
            if (remainingCapacity < 1000) {
                AsyncDispatcher.LOG.warn("Very low remaining capacity in the event-queue: " + remainingCapacity);
            }
            try {
                AsyncDispatcher.this.eventQueue.put(event);
            } catch (InterruptedException e) {
                if (!AsyncDispatcher.this.stopped) {
                    AsyncDispatcher.LOG.warn("AsyncDispatcher thread interrupted", (Throwable) e);
                }
                AsyncDispatcher.this.drained = AsyncDispatcher.this.eventQueue.isEmpty();
                throw new YarnRuntimeException(e);
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/hadoop-yarn-common-2.7.6.500-eep-813.jar:org/apache/hadoop/yarn/event/AsyncDispatcher$MultiListenerHandler.class */
    static class MultiListenerHandler implements EventHandler<Event> {
        List<EventHandler<Event>> listofHandlers = new ArrayList();

        @Override // org.apache.hadoop.yarn.event.EventHandler
        public void handle(Event event) {
            Iterator<EventHandler<Event>> it = this.listofHandlers.iterator();
            while (it.hasNext()) {
                it.next().handle(event);
            }
        }

        void addHandler(EventHandler<Event> eventHandler) {
            this.listofHandlers.add(eventHandler);
        }
    }

    public AsyncDispatcher() {
        this(new LinkedBlockingQueue());
    }

    public AsyncDispatcher(BlockingQueue<Event> blockingQueue) {
        super("Dispatcher");
        this.stopped = false;
        this.drainEventsOnStop = false;
        this.drained = true;
        this.waitForDrained = new Object();
        this.blockNewEvents = false;
        this.handlerInstance = new GenericEventHandler();
        this.eventQueue = blockingQueue;
        this.eventDispatchers = new HashMap();
    }

    Runnable createThread() {
        return new Runnable() { // from class: org.apache.hadoop.yarn.event.AsyncDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                while (!AsyncDispatcher.this.stopped && !Thread.currentThread().isInterrupted()) {
                    AsyncDispatcher.this.drained = AsyncDispatcher.this.eventQueue.isEmpty();
                    if (AsyncDispatcher.this.blockNewEvents) {
                        synchronized (AsyncDispatcher.this.waitForDrained) {
                            if (AsyncDispatcher.this.drained) {
                                AsyncDispatcher.this.waitForDrained.notify();
                            }
                        }
                    }
                    try {
                        Event take = AsyncDispatcher.this.eventQueue.take();
                        if (take != null) {
                            AsyncDispatcher.this.dispatch(take);
                        }
                    } catch (InterruptedException e) {
                        if (AsyncDispatcher.this.stopped) {
                            return;
                        }
                        AsyncDispatcher.LOG.warn("AsyncDispatcher thread interrupted", (Throwable) e);
                        return;
                    }
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceInit(Configuration configuration) throws Exception {
        this.exitOnDispatchException = configuration.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
        super.serviceInit(configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStart() throws Exception {
        super.serviceStart();
        this.eventHandlingThread = new Thread(createThread());
        this.eventHandlingThread.setName("AsyncDispatcher event handler");
        this.eventHandlingThread.start();
    }

    public void setDrainEventsOnStop() {
        this.drainEventsOnStop = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.service.AbstractService
    public void serviceStop() throws Exception {
        if (this.drainEventsOnStop) {
            this.blockNewEvents = true;
            LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
            long currentTimeMillis = System.currentTimeMillis() + getConfig().getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 300000L);
            synchronized (this.waitForDrained) {
                while (!this.drained && this.eventHandlingThread != null && this.eventHandlingThread.isAlive() && System.currentTimeMillis() < currentTimeMillis) {
                    this.waitForDrained.wait(1000L);
                    LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + this.eventHandlingThread.getState());
                }
            }
        }
        this.stopped = true;
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
            try {
                this.eventHandlingThread.join(DF.DF_INTERVAL_DEFAULT);
            } catch (InterruptedException e) {
                LOG.warn("Interrupted Exception while stopping", (Throwable) e);
            }
        }
        super.serviceStop();
    }

    protected void dispatch(Event event) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString());
        }
        Class declaringClass = event.getType().getDeclaringClass();
        try {
            EventHandler eventHandler = this.eventDispatchers.get(declaringClass);
            if (eventHandler == null) {
                throw new Exception("No handler for registered for " + declaringClass);
            }
            eventHandler.handle(event);
        } catch (Throwable th) {
            LOG.error(FATAL, "Error in dispatcher thread", th);
            if (!this.exitOnDispatchException || ShutdownHookManager.get().isShutdownInProgress() || this.stopped) {
                return;
            }
            Thread thread = new Thread(createShutDownThread());
            thread.setName("AsyncDispatcher ShutDown handler");
            thread.start();
        }
    }

    @Override // org.apache.hadoop.yarn.event.Dispatcher
    public void register(Class<? extends Enum> cls, EventHandler eventHandler) {
        EventHandler<Event> eventHandler2 = this.eventDispatchers.get(cls);
        LOG.info("Registering " + cls + " for " + eventHandler.getClass());
        if (eventHandler2 == null) {
            this.eventDispatchers.put(cls, eventHandler);
            return;
        }
        if (eventHandler2 instanceof MultiListenerHandler) {
            ((MultiListenerHandler) eventHandler2).addHandler(eventHandler);
            return;
        }
        MultiListenerHandler multiListenerHandler = new MultiListenerHandler();
        multiListenerHandler.addHandler(eventHandler2);
        multiListenerHandler.addHandler(eventHandler);
        this.eventDispatchers.put(cls, multiListenerHandler);
    }

    @Override // org.apache.hadoop.yarn.event.Dispatcher
    public EventHandler getEventHandler() {
        return this.handlerInstance;
    }

    Runnable createShutDownThread() {
        return new Runnable() { // from class: org.apache.hadoop.yarn.event.AsyncDispatcher.2
            @Override // java.lang.Runnable
            public void run() {
                AsyncDispatcher.LOG.info("Exiting, bbye..");
                System.exit(-1);
            }
        };
    }

    @VisibleForTesting
    protected boolean isEventThreadWaiting() {
        return this.eventHandlingThread.getState() == Thread.State.WAITING;
    }

    @VisibleForTesting
    protected boolean isDrained() {
        return this.drained;
    }
}
