package org.apache.tez.common;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/tez/common/AsyncDispatcher.class */
public class AsyncDispatcher extends CompositeService implements Dispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class);
    private final String name;
    private final BlockingQueue<Event> eventQueue;
    private volatile boolean stopped;
    private volatile boolean drainEventsOnStop;
    private volatile boolean drained;
    private Object waitForDrained;
    private volatile boolean blockNewEvents;
    private EventHandler handlerInstance;
    private Thread eventHandlingThread;
    protected final Map<Class<? extends Enum>, EventHandler> eventHandlers;
    protected final Map<Class<? extends Enum>, AsyncDispatcher> eventDispatchers;
    protected final Map<Class<? extends Enum>, AsyncDispatcherConcurrent> concurrentEventDispatchers;
    private boolean exitOnDispatchException;

    /* loaded from: input_file:org/apache/tez/common/AsyncDispatcher$GenericEventHandler.class */
    class GenericEventHandler implements EventHandler<Event> {
        GenericEventHandler() {
        }

        public void handle(Event event) {
            if (AsyncDispatcher.this.stopped || AsyncDispatcher.this.blockNewEvents) {
                return;
            }
            AsyncDispatcher.this.drained = false;
            Class declaringClass = event.getType().getDeclaringClass();
            AsyncDispatcher asyncDispatcher = AsyncDispatcher.this.eventDispatchers.get(declaringClass);
            if (asyncDispatcher != null) {
                asyncDispatcher.getEventHandler().handle(event);
                return;
            }
            AsyncDispatcherConcurrent asyncDispatcherConcurrent = AsyncDispatcher.this.concurrentEventDispatchers.get(declaringClass);
            if (asyncDispatcherConcurrent != null) {
                asyncDispatcherConcurrent.getEventHandler().handle(event);
                return;
            }
            int size = AsyncDispatcher.this.eventQueue.size();
            if (size != 0 && size % 1000 == 0) {
                AsyncDispatcher.LOG.info("Size of event-queue is " + size);
            }
            int remainingCapacity = AsyncDispatcher.this.eventQueue.remainingCapacity();
            if (remainingCapacity < 1000) {
                AsyncDispatcher.LOG.warn("Very low remaining capacity in the event-queue: " + remainingCapacity);
            }
            try {
                AsyncDispatcher.this.eventQueue.put(event);
            } catch (InterruptedException e) {
                if (!AsyncDispatcher.this.stopped) {
                    AsyncDispatcher.LOG.warn("AsyncDispatcher thread interrupted", e);
                }
                throw new YarnRuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/common/AsyncDispatcher$MultiListenerHandler.class */
    public static class MultiListenerHandler implements EventHandler<Event> {
        List<EventHandler<Event>> listofHandlers = new ArrayList();

        public void handle(Event event) {
            Iterator<EventHandler<Event>> it = this.listofHandlers.iterator();
            while (it.hasNext()) {
                it.next().handle(event);
            }
        }

        void addHandler(EventHandler<Event> eventHandler) {
            this.listofHandlers.add(eventHandler);
        }
    }

    public AsyncDispatcher(String str) {
        this(str, new LinkedBlockingQueue());
    }

    public AsyncDispatcher(String str, BlockingQueue<Event> blockingQueue) {
        super(str);
        this.stopped = false;
        this.drainEventsOnStop = false;
        this.drained = true;
        this.waitForDrained = new Object();
        this.blockNewEvents = false;
        this.handlerInstance = new GenericEventHandler();
        this.eventHandlers = Maps.newHashMap();
        this.eventDispatchers = Maps.newHashMap();
        this.concurrentEventDispatchers = Maps.newHashMap();
        this.name = str;
        this.eventQueue = blockingQueue;
    }

    public Runnable createThread() {
        return new Runnable() { // from class: org.apache.tez.common.AsyncDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                while (!AsyncDispatcher.this.stopped && !Thread.currentThread().isInterrupted()) {
                    AsyncDispatcher.this.drained = AsyncDispatcher.this.eventQueue.isEmpty();
                    if (AsyncDispatcher.this.blockNewEvents) {
                        synchronized (AsyncDispatcher.this.waitForDrained) {
                            if (AsyncDispatcher.this.drained) {
                                AsyncDispatcher.this.waitForDrained.notify();
                            }
                        }
                    }
                    try {
                        Event event = (Event) AsyncDispatcher.this.eventQueue.take();
                        if (event != null) {
                            AsyncDispatcher.this.dispatch(event);
                        }
                    } catch (InterruptedException e) {
                        if (AsyncDispatcher.this.stopped) {
                            return;
                        }
                        AsyncDispatcher.LOG.warn("AsyncDispatcher thread interrupted", e);
                        return;
                    }
                }
            }
        };
    }

    protected void serviceInit(Configuration configuration) throws Exception {
        this.exitOnDispatchException = configuration.getBoolean("yarn.dispatcher.exit-on-error", false);
        super.serviceInit(configuration);
    }

    protected void serviceStart() throws Exception {
        this.eventHandlingThread = new Thread(createThread());
        this.eventHandlingThread.setName("Dispatcher thread {" + this.name + "}");
        this.eventHandlingThread.start();
        super.serviceStart();
    }

    public void setDrainEventsOnStop() {
        this.drainEventsOnStop = true;
    }

    protected void serviceStop() throws Exception {
        if (this.drainEventsOnStop) {
            this.blockNewEvents = true;
            LOG.info("AsyncDispatcher is draining to stop, ignoring any new events.");
            synchronized (this.waitForDrained) {
                while (!this.drained && this.eventHandlingThread.isAlive()) {
                    this.waitForDrained.wait(1000L);
                    LOG.info("Waiting for AsyncDispatcher to drain.");
                }
            }
        }
        this.stopped = true;
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
            try {
                this.eventHandlingThread.join();
            } catch (InterruptedException e) {
                LOG.warn("Interrupted Exception while stopping", e);
            }
        }
        super.serviceStop();
    }

    protected void dispatch(Event event) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Dispatching the event " + event.getClass().getName() + "." + event.toString());
        }
        Class declaringClass = event.getType().getDeclaringClass();
        try {
            EventHandler eventHandler = this.eventHandlers.get(declaringClass);
            if (eventHandler == null) {
                throw new Exception("No handler for registered for " + declaringClass);
            }
            eventHandler.handle(event);
        } catch (Throwable th) {
            LOG.error("Error in dispatcher thread", th);
            if (!this.exitOnDispatchException || ShutdownHookManager.get().isShutdownInProgress() || this.stopped) {
                return;
            }
            Thread thread = new Thread(createShutDownThread());
            thread.setName("AsyncDispatcher ShutDown handler");
            thread.start();
        }
    }

    private void checkForExistingHandler(Class<? extends Enum> cls) {
        Preconditions.checkState(this.eventHandlers.get(cls) == null, "Cannot register same event on multiple dispatchers");
    }

    private void checkForExistingDispatcher(Class<? extends Enum> cls) {
        Preconditions.checkState(this.eventDispatchers.get(cls) == null, "Multiple dispatchers cannot be registered for: " + cls.getName());
    }

    private void checkForExistingConcurrentDispatcher(Class<? extends Enum> cls) {
        Preconditions.checkState(this.concurrentEventDispatchers.get(cls) == null, "Multiple concurrent dispatchers cannot be registered for: " + cls.getName());
    }

    private void checkForExistingDispatchers(boolean z, Class<? extends Enum> cls) {
        if (z) {
            checkForExistingHandler(cls);
        }
        checkForExistingDispatcher(cls);
        checkForExistingConcurrentDispatcher(cls);
    }

    public void register(Class<? extends Enum> cls, EventHandler eventHandler) {
        Preconditions.checkState(getServiceState() == Service.STATE.NOTINITED);
        EventHandler<Event> eventHandler2 = this.eventHandlers.get(cls);
        checkForExistingDispatchers(false, cls);
        LOG.info("Registering " + cls + " for " + eventHandler.getClass());
        if (eventHandler2 == null) {
            this.eventHandlers.put(cls, eventHandler);
            return;
        }
        if (eventHandler2 instanceof MultiListenerHandler) {
            ((MultiListenerHandler) eventHandler2).addHandler(eventHandler);
            return;
        }
        MultiListenerHandler multiListenerHandler = new MultiListenerHandler();
        multiListenerHandler.addHandler(eventHandler2);
        multiListenerHandler.addHandler(eventHandler);
        this.eventHandlers.put(cls, multiListenerHandler);
    }

    public void registerAndCreateDispatcher(Class<? extends Enum> cls, EventHandler eventHandler, String str) {
        Preconditions.checkState(getServiceState() == Service.STATE.NOTINITED);
        checkForExistingDispatchers(true, cls);
        LOG.info("Registering " + cls + " for independent dispatch using: " + eventHandler.getClass());
        AsyncDispatcher asyncDispatcher = new AsyncDispatcher(str);
        asyncDispatcher.register(cls, eventHandler);
        this.eventDispatchers.put(cls, asyncDispatcher);
        addIfService(asyncDispatcher);
    }

    public AsyncDispatcherConcurrent registerAndCreateDispatcher(Class<? extends Enum> cls, EventHandler eventHandler, String str, int i) {
        Preconditions.checkState(getServiceState() == Service.STATE.NOTINITED);
        checkForExistingDispatchers(true, cls);
        LOG.info("Registering " + cls + " for concurrent dispatch using: " + eventHandler.getClass());
        AsyncDispatcherConcurrent asyncDispatcherConcurrent = new AsyncDispatcherConcurrent(str, i);
        asyncDispatcherConcurrent.register(cls, eventHandler);
        this.concurrentEventDispatchers.put(cls, asyncDispatcherConcurrent);
        addIfService(asyncDispatcherConcurrent);
        return asyncDispatcherConcurrent;
    }

    public void registerWithExistingDispatcher(Class<? extends Enum> cls, EventHandler eventHandler, AsyncDispatcherConcurrent asyncDispatcherConcurrent) {
        Preconditions.checkState(getServiceState() == Service.STATE.NOTINITED);
        checkForExistingDispatchers(true, cls);
        LOG.info("Registering " + cls + " with existing concurrent dispatch using: " + eventHandler.getClass());
        asyncDispatcherConcurrent.register(cls, eventHandler);
        this.concurrentEventDispatchers.put(cls, asyncDispatcherConcurrent);
    }

    public EventHandler getEventHandler() {
        return this.handlerInstance;
    }

    Runnable createShutDownThread() {
        return new Runnable() { // from class: org.apache.tez.common.AsyncDispatcher.2
            @Override // java.lang.Runnable
            public void run() {
                AsyncDispatcher.LOG.info("Exiting, bbye..");
                System.exit(-1);
            }
        };
    }

    @InterfaceAudience.Private
    public int getQueueSize() {
        return this.eventQueue.size();
    }
}
