/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.protobuf.Message;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.util.ReflectionUtils;

@InterfaceAudience.LimitedPrivate(value={"Coprocesssor", "Phoenix"})
@InterfaceStability.Evolving
public class RWQueueRpcExecutor
extends RpcExecutor {
    private static final Log LOG = LogFactory.getLog(RWQueueRpcExecutor.class);
    private final List<BlockingQueue<CallRunner>> queues;
    private final RpcExecutor.QueueBalancer writeBalancer;
    private final RpcExecutor.QueueBalancer readBalancer;
    private final int writeHandlersCount;
    private final int readHandlersCount;
    private final int numWriteQueues;
    private final int numReadQueues;

    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, int maxQueueLength) {
        this(name, handlerCount, numQueues, readShare, maxQueueLength, null, null);
    }

    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, int maxQueueLength, Configuration conf, Abortable abortable) {
        this(name, handlerCount, numQueues, readShare, maxQueueLength, conf, abortable, LinkedBlockingQueue.class, new Object[0]);
    }

    public RWQueueRpcExecutor(String name, int handlerCount, int numQueues, float readShare, int maxQueueLength, Configuration conf, Abortable abortable, Class<? extends BlockingQueue> readQueueClass, Object ... readQueueInitArgs) {
        this(name, RWQueueRpcExecutor.calcNumWriters(handlerCount, readShare), RWQueueRpcExecutor.calcNumReaders(handlerCount, readShare), RWQueueRpcExecutor.calcNumWriters(numQueues, readShare), RWQueueRpcExecutor.calcNumReaders(numQueues, readShare), conf, abortable, LinkedBlockingQueue.class, new Object[]{maxQueueLength}, readQueueClass, ArrayUtils.addAll(new Object[]{maxQueueLength}, readQueueInitArgs));
    }

    public RWQueueRpcExecutor(String name, int writeHandlers, int readHandlers, int numWriteQueues, int numReadQueues, Configuration conf, Abortable abortable, Class<? extends BlockingQueue> writeQueueClass, Object[] writeQueueInitArgs, Class<? extends BlockingQueue> readQueueClass, Object[] readQueueInitArgs) {
        super(name, Math.max(writeHandlers + readHandlers, numWriteQueues + numReadQueues), conf, abortable);
        int i;
        this.writeHandlersCount = Math.max(writeHandlers, numWriteQueues);
        this.readHandlersCount = Math.max(readHandlers, numReadQueues);
        this.numWriteQueues = numWriteQueues;
        this.numReadQueues = numReadQueues;
        this.writeBalancer = RWQueueRpcExecutor.getBalancer(numWriteQueues);
        this.readBalancer = RWQueueRpcExecutor.getBalancer(numReadQueues);
        this.queues = new ArrayList<BlockingQueue<CallRunner>>(this.writeHandlersCount + this.readHandlersCount);
        LOG.debug(name + " writeQueues=" + numWriteQueues + " writeHandlers=" + this.writeHandlersCount + " readQueues=" + numReadQueues + " readHandlers=" + this.readHandlersCount);
        for (i = 0; i < numWriteQueues; ++i) {
            this.queues.add(ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
        }
        for (i = 0; i < numReadQueues; ++i) {
            this.queues.add(ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
        }
    }

    @Override
    protected void startHandlers(int port) {
        this.startHandlers(".write", this.writeHandlersCount, this.queues, 0, this.numWriteQueues, port);
        this.startHandlers(".read", this.readHandlersCount, this.queues, this.numWriteQueues, this.numReadQueues, port);
    }

    @Override
    public void dispatch(CallRunner callTask) throws InterruptedException {
        RpcServer.Call call = callTask.getCall();
        int queueIndex = this.isWriteRequest(call.getHeader(), call.param) ? this.writeBalancer.getNextQueue() : this.numWriteQueues + this.readBalancer.getNextQueue();
        this.queues.get(queueIndex).put(callTask);
    }

    private boolean isWriteRequest(RPCProtos.RequestHeader header, Message param2) {
        String methodName = header.getMethodName();
        if (methodName.equalsIgnoreCase("multi") && param2 instanceof ClientProtos.MultiRequest) {
            ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest)param2;
            for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
                for (ClientProtos.Action action : regionAction.getActionList()) {
                    if (!action.hasMutation()) continue;
                    return true;
                }
            }
        }
        return false;
    }

    @Override
    public int getQueueLength() {
        int length = 0;
        for (BlockingQueue<CallRunner> queue : this.queues) {
            length += queue.size();
        }
        return length;
    }

    @Override
    protected List<BlockingQueue<CallRunner>> getQueues() {
        return this.queues;
    }

    private static int calcNumWriters(int count2, float readShare) {
        return Math.max(1, count2 - Math.max(1, Math.round((float)count2 * readShare)));
    }

    private static int calcNumReaders(int count2, float readShare) {
        return count2 - RWQueueRpcExecutor.calcNumWriters(count2, readShare);
    }
}

