/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import com.google.common.annotations.VisibleForTesting;
import java.lang.ref.WeakReference;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.DecayRpcScheduler;
import org.apache.hadoop.ipc.FairCallQueueMXBean;
import org.apache.hadoop.ipc.RpcMultiplexer;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.WeightedRoundRobinMultiplexer;
import org.apache.hadoop.metrics2.util.MBeans;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FairCallQueue<E extends Schedulable>
extends AbstractQueue<E>
implements BlockingQueue<E> {
    public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
    public static final String IPC_CALLQUEUE_PRIORITY_LEVELS_KEY = "faircallqueue.priority-levels";
    public static final Logger LOG = LoggerFactory.getLogger(FairCallQueue.class);
    private final ArrayList<BlockingQueue<E>> queues;
    private final ReentrantLock takeLock = new ReentrantLock();
    private final Condition notEmpty = this.takeLock.newCondition();
    private RpcScheduler scheduler;
    private RpcMultiplexer multiplexer;
    private final ArrayList<AtomicLong> overflowedCalls;

    private void signalNotEmpty() {
        this.takeLock.lock();
        try {
            this.notEmpty.signal();
        }
        finally {
            this.takeLock.unlock();
        }
    }

    public FairCallQueue(int capacity, String ns, Configuration conf) {
        int numQueues = FairCallQueue.parseNumQueues(ns, conf);
        LOG.info("FairCallQueue is in use with " + numQueues + " queues.");
        this.queues = new ArrayList(numQueues);
        this.overflowedCalls = new ArrayList(numQueues);
        for (int i = 0; i < numQueues; ++i) {
            this.queues.add(new LinkedBlockingQueue(capacity));
            this.overflowedCalls.add(new AtomicLong(0L));
        }
        this.scheduler = new DecayRpcScheduler(numQueues, ns, conf);
        this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
        MetricsProxy mp = MetricsProxy.getInstance(ns);
        mp.setDelegate(this);
    }

    private static int parseNumQueues(String ns, Configuration conf) {
        int retval = conf.getInt(ns + ".faircallqueue.priority-levels", 4);
        if (retval < 1) {
            throw new IllegalArgumentException("numQueues must be at least 1");
        }
        return retval;
    }

    private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
        int numQueues = this.queues.size();
        for (int i = 0; i < numQueues; ++i) {
            int idx = (i + startIdx) % numQueues;
            BlockingQueue<E> queue = this.queues.get(idx);
            if (queue.size() == 0) continue;
            return queue;
        }
        return null;
    }

    @Override
    public void put(E e) throws InterruptedException {
        BlockingQueue<E> q;
        boolean res;
        int priorityLevel = this.scheduler.getPriorityLevel((Schedulable)e);
        int numLevels = this.queues.size();
        while (!(res = (q = this.queues.get(priorityLevel)).offer(e))) {
            this.overflowedCalls.get(priorityLevel).getAndIncrement();
            if (++priorityLevel != numLevels) continue;
            this.queues.get(priorityLevel - 1).put(e);
            break;
        }
        this.signalNotEmpty();
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        int priorityLevel = this.scheduler.getPriorityLevel((Schedulable)e);
        BlockingQueue<E> q = this.queues.get(priorityLevel);
        boolean ret = q.offer(e, timeout, unit);
        this.signalNotEmpty();
        return ret;
    }

    @Override
    public boolean offer(E e) {
        int priorityLevel = this.scheduler.getPriorityLevel((Schedulable)e);
        BlockingQueue<E> q = this.queues.get(priorityLevel);
        boolean ret = q.offer(e);
        this.signalNotEmpty();
        return ret;
    }

    @Override
    public E take() throws InterruptedException {
        int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
        this.takeLock.lockInterruptibly();
        try {
            while (true) {
                Schedulable e;
                BlockingQueue<E> q;
                if ((q = this.getFirstNonEmptyQueue(startIdx)) != null && (e = (Schedulable)q.poll()) != null) {
                    Schedulable schedulable = e;
                    return (E)schedulable;
                }
                this.notEmpty.await();
            }
        }
        finally {
            this.takeLock.unlock();
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [0[TRYBLOCK]], but top level block is 7[UNCONDITIONALDOLOOP]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public E poll() {
        int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
        BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
        if (q == null) {
            return null;
        }
        return (E)((Schedulable)q.poll());
    }

    @Override
    public E peek() {
        BlockingQueue<E> q = this.getFirstNonEmptyQueue(0);
        if (q == null) {
            return null;
        }
        return (E)((Schedulable)q.peek());
    }

    @Override
    public int size() {
        int size = 0;
        for (BlockingQueue<E> q : this.queues) {
            size += q.size();
        }
        return size;
    }

    @Override
    public Iterator<E> iterator() {
        throw new NotImplementedException();
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        int sum = 0;
        for (BlockingQueue<? super E> blockingQueue : this.queues) {
            sum += blockingQueue.drainTo(c, maxElements);
        }
        return sum;
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        int sum = 0;
        for (BlockingQueue<? super E> blockingQueue : this.queues) {
            sum += blockingQueue.drainTo(c);
        }
        return sum;
    }

    @Override
    public int remainingCapacity() {
        int sum = 0;
        for (BlockingQueue<E> q : this.queues) {
            sum += q.remainingCapacity();
        }
        return sum;
    }

    public int[] getQueueSizes() {
        int numQueues = this.queues.size();
        int[] sizes = new int[numQueues];
        for (int i = 0; i < numQueues; ++i) {
            sizes[i] = this.queues.get(i).size();
        }
        return sizes;
    }

    public long[] getOverflowedCalls() {
        int numQueues = this.queues.size();
        long[] calls = new long[numQueues];
        for (int i = 0; i < numQueues; ++i) {
            calls[i] = this.overflowedCalls.get(i).get();
        }
        return calls;
    }

    @VisibleForTesting
    public void setScheduler(RpcScheduler newScheduler) {
        this.scheduler = newScheduler;
    }

    @VisibleForTesting
    public void setMultiplexer(RpcMultiplexer newMux) {
        this.multiplexer = newMux;
    }

    private static final class MetricsProxy
    implements FairCallQueueMXBean {
        private static final HashMap<String, MetricsProxy> INSTANCES = new HashMap();
        private WeakReference<FairCallQueue> delegate;
        private int revisionNumber = 0;

        private MetricsProxy(String namespace) {
            MBeans.register(namespace, "FairCallQueue", this);
        }

        public static synchronized MetricsProxy getInstance(String namespace) {
            MetricsProxy mp = INSTANCES.get(namespace);
            if (mp == null) {
                mp = new MetricsProxy(namespace);
                INSTANCES.put(namespace, mp);
            }
            return mp;
        }

        public void setDelegate(FairCallQueue obj) {
            this.delegate = new WeakReference<FairCallQueue>(obj);
            ++this.revisionNumber;
        }

        @Override
        public int[] getQueueSizes() {
            FairCallQueue obj = (FairCallQueue)this.delegate.get();
            if (obj == null) {
                return new int[0];
            }
            return obj.getQueueSizes();
        }

        @Override
        public long[] getOverflowedCalls() {
            FairCallQueue obj = (FairCallQueue)this.delegate.get();
            if (obj == null) {
                return new long[0];
            }
            return obj.getOverflowedCalls();
        }

        @Override
        public int getRevision() {
            return this.revisionNumber;
        }
    }
}

