/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.event.multidispatcher;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.multidispatcher.MultiDispatcherConfig;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.slf4j.Logger;

public class MultiDispatcherExecutor {
    private final Logger log;
    private final MultiDispatcherConfig config;
    private final MultiDispatcherExecutorThread[] threads;
    private final Clock clock = new MonotonicClock();

    public MultiDispatcherExecutor(Logger log, MultiDispatcherConfig config, String dispatcherName) {
        this.log = log;
        this.config = config;
        this.threads = new MultiDispatcherExecutorThread[config.getDefaultPoolSize()];
        ThreadGroup group = new ThreadGroup(dispatcherName);
        for (int i = 0; i < this.threads.length; ++i) {
            this.threads[i] = new MultiDispatcherExecutorThread(group, i, config.getQueueSize());
        }
    }

    public void start() {
        for (MultiDispatcherExecutorThread t2 : this.threads) {
            t2.start();
        }
    }

    public void execute(Event event, Runnable runnable) {
        String lockKey = event.getLockKey();
        int threadIndex = lockKey == null || lockKey.hashCode() == Integer.MIN_VALUE ? 0 : Math.abs(lockKey.hashCode() % this.threads.length);
        MultiDispatcherExecutorThread thread = this.threads[threadIndex];
        thread.add(runnable);
        this.log.trace("The {} with lock key {} will be handled by {}", new Object[]{event.getType(), lockKey, thread.getName()});
    }

    public void stop() throws InterruptedException {
        long timeOut = this.clock.getTime() + (long)this.config.getGracefulStopSeconds() * 1000L;
        if (Arrays.stream(this.threads).anyMatch(t2 -> 0L < t2.queueSize()) && this.clock.getTime() < timeOut) {
            this.log.debug("Not all event queue is empty, waiting to drain ...");
            Thread.sleep(1000L);
        }
        Thread.sleep(2000L);
        for (MultiDispatcherExecutorThread thread : this.threads) {
            thread.interrupt();
        }
    }

    public Map<String, Long> getQueuesSize() {
        return Arrays.stream(this.threads).collect(Collectors.toMap(Thread::getName, MultiDispatcherExecutorThread::queueSize));
    }

    private final class MultiDispatcherExecutorThread
    extends Thread {
        private final BlockingQueue<Runnable> queue;

        MultiDispatcherExecutorThread(ThreadGroup group, int index, int queueSize) {
            super(group, String.format("%s-worker-%d", group.getName(), index));
            this.queue = new LinkedBlockingQueue<Runnable>(queueSize);
        }

        void add(Runnable runnable) {
            this.queue.add(runnable);
        }

        long queueSize() {
            return this.queue.size();
        }

        @Override
        public void run() {
            try {
                while (true) {
                    this.queue.take().run();
                }
            }
            catch (InterruptedException e) {
                MultiDispatcherExecutor.this.log.warn("{} get interrupted", (Object)this.getName());
                return;
            }
        }
    }
}

