/*
 * Decompiled with CFR 0.152.
 */
package org.hbase.async;

import com.mapr.fs.MapRHTable;
import com.mapr.fs.MapRResultScanner;
import com.mapr.fs.jni.MapRCallBackQueue;
import com.mapr.fs.jni.MapRGet;
import com.mapr.fs.jni.MapRIncrement;
import com.mapr.fs.jni.MapRPut;
import com.mapr.fs.jni.MapRResult;
import com.mapr.fs.jni.MapRScan;
import com.stumbleupon.async.Deferred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.Bytes;
import org.hbase.async.CompareAndSetRequest;
import org.hbase.async.DeleteRequest;
import org.hbase.async.GetRequest;
import org.hbase.async.HBaseRpc;
import org.hbase.async.KeyValue;
import org.hbase.async.MapRConverter;
import org.hbase.async.NoSuchColumnFamilyException;
import org.hbase.async.Scanner;
import org.hbase.async.UnknownScannerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MapRThreadPool
implements MapRCallBackQueue {
    private static final Logger LOG = LoggerFactory.getLogger(MapRThreadPool.class);
    public static final int MIN_THREADS = 5;
    public static final int MAX_THREADS = 10;
    public static final int MIN_SCAN_THREADS = 3;
    public static final int MAX_SCAN_THREADS = 32;
    private ExecutorService pool;
    BlockingQueue<AsyncHBaseRpc> asyncrpcQueue = new LinkedBlockingQueue<AsyncHBaseRpc>();
    private ExecutorService scanpool;
    BlockingQueue<MapRScanPlus> scanRequestQueue;

    MapRThreadPool() {
        int i;
        this.pool = new ThreadPoolExecutor(6, 10, 1L, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
        for (i = 0; i < 5; ++i) {
            this.pool.execute(new RpcRunnable());
        }
        this.scanpool = new ThreadPoolExecutor(3, 32, 1L, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
        for (i = 0; i < 3; ++i) {
            this.scanpool.execute(new ScanRpcRunnable(this));
        }
        this.scanRequestQueue = new LinkedBlockingQueue<MapRScanPlus>();
    }

    public boolean hasRemainingCapacity() {
        return this.scanRequestQueue.remainingCapacity() != 0;
    }

    public void addToQ(MapRScanPlus mscanPlus) {
        this.scanRequestQueue.add(mscanPlus);
    }

    public MapRScanPlus takeFromQ() {
        MapRScanPlus msp;
        try {
            msp = this.scanRequestQueue.take();
        }
        catch (InterruptedException e) {
            msp = null;
        }
        return msp;
    }

    public void sendRpc(HBaseRpc rpc, MapRHTable mTable) {
        AsyncHBaseRpc asrpc = new AsyncHBaseRpc(rpc, mTable);
        this.asyncrpcQueue.add(asrpc);
    }

    public void doFlush(Deferred<?> deferred, MapRHTable mTable) {
        AsyncHBaseRpc asrpc = new AsyncHBaseRpc(null, mTable, deferred);
        this.asyncrpcQueue.add(asrpc);
    }

    public void closeScanner(Deferred<?> d, MapRHTable mTable, Scanner scan) {
        AsyncHBaseRpc asrpc = new AsyncHBaseRpc(null, mTable, d, scan);
        this.asyncrpcQueue.add(asrpc);
    }

    public void runCallbackChain(LinkedList<Object> requests, LinkedList<Object> responses) {
        try {
            this.pool.execute(new CallBackRunnable(requests, responses));
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public void shutdown() {
        this.pool.shutdownNow();
        this.scanpool.shutdownNow();
    }

    public void addRequest(HBaseRpc dummyRpc, Scanner scan, MapRThreadPool mpool) {
        try {
            if (scan.mscan == null) {
                scan.mscan = MapRConverter.toMapRScan(scan, scan.mTable);
                if (scan.getFilterMsg() != null) {
                    scan.mscan.setFilter(scan.getFilterMsg().toByteArray());
                }
            }
            scan.mscan.startRow = scan.getCurrentKey();
            scan.mscan.stopRow = scan.getStopKey();
            MapRScanPlus mscanPlus = new MapRScanPlus(scan.mscan, dummyRpc, scan, scan.mTable);
            if (!mpool.hasRemainingCapacity()) {
                throw new UnknownScannerException("Failed to add scan request to queue", dummyRpc);
            }
            mpool.addToQ(mscanPlus);
        }
        catch (Exception e) {
            LOG.error("Exception in addRequest: " + e.getMessage());
            dummyRpc.callback(e);
            Deferred.fromError((Exception)e);
        }
    }

    static class CallBackRunnable
    implements Runnable {
        LinkedList<Object> requests;
        LinkedList<Object> responses;

        public CallBackRunnable(LinkedList<Object> requests, LinkedList<Object> responses) {
            this.requests = requests;
            this.responses = responses;
        }

        @Override
        public void run() {
            for (Object e : this.requests) {
                Object result = this.responses.remove();
                try {
                    if (e == null || !(e instanceof HBaseRpc)) continue;
                    HBaseRpc rpc = (HBaseRpc)e;
                    rpc.callback(result);
                }
                catch (Exception e2) {}
            }
        }
    }

    static class ScanRpcRunnable
    implements Runnable {
        MapRThreadPool mTpool;

        ScanRpcRunnable(MapRThreadPool mtp) {
            this.mTpool = mtp;
        }

        @Override
        public void run() {
            while (true) {
                HBaseRpc dummyRpc = null;
                try {
                    MapRScanPlus mscanPlus = this.mTpool.takeFromQ();
                    if (mscanPlus == null) break;
                    MapRScan mscan = mscanPlus.mscan;
                    Scanner scan = mscanPlus.scan;
                    dummyRpc = mscanPlus.dummyRpc;
                    MapRHTable mTable = mscanPlus.mtbl;
                    if (scan.mresultScanner == null) {
                        scan.mresultScanner = new MapRResultScanner(mscan, mTable);
                    }
                    MapRResult[] mresults = scan.mresultScanner.nextRows(scan.getMaxNumRows());
                    int num_rows = 0;
                    for (MapRResult mr : mresults) {
                        if (mr.isEmpty()) break;
                        ++num_rows;
                    }
                    if (num_rows == 0) {
                        scan.mresultScanner.releaseTempMemory();
                        if (dummyRpc == null) continue;
                        dummyRpc.callback(null);
                        continue;
                    }
                    ArrayList<ArrayList<KeyValue>> rows = new ArrayList<ArrayList<KeyValue>>(num_rows);
                    for (int i = 0; i < num_rows; ++i) {
                        int keyLen = mresults[i].getKeyLength();
                        byte[] key = new byte[keyLen];
                        mresults[i].getByteBuf().position(0);
                        mresults[i].getByteBuf().get(key, 0, keyLen);
                        ArrayList<KeyValue> kv = MapRConverter.toAsyncHBaseResult(mresults[i], key, mTable);
                        rows.add(kv);
                    }
                    scan.mresultScanner.releaseTempMemory();
                    dummyRpc.callback(rows);
                }
                catch (Exception e) {
                    if (dummyRpc == null) continue;
                    LOG.error("Exception in scanner thread: " + e.getMessage() + "thrd=" + Thread.currentThread().getId());
                    dummyRpc.callback(e);
                    Deferred.fromError((Exception)e);
                }
            }
        }
    }

    class RpcRunnable
    implements Runnable {
        @Override
        public void run() {
            while (true) {
                AsyncHBaseRpc asrpc;
                try {
                    asrpc = MapRThreadPool.this.asyncrpcQueue.take();
                }
                catch (InterruptedException e) {
                    break;
                }
                catch (Exception e) {
                    Deferred.fromError((Exception)e);
                    continue;
                }
                HBaseRpc rpc = asrpc.rpc;
                MapRHTable mTable = asrpc.mTable;
                Scanner sc = asrpc.scanner;
                if (sc != null) {
                    sc.mresultScanner.close();
                    asrpc.deferred.callback(null);
                    continue;
                }
                if (rpc == null) {
                    try {
                        mTable.asyncFlush();
                        asrpc.deferred.callback(null);
                    }
                    catch (Exception e) {
                        asrpc.deferred.callback((Object)e);
                        Deferred.fromError((Exception)e);
                    }
                    continue;
                }
                if (rpc instanceof GetRequest) {
                    try {
                        GetRequest grpc = (GetRequest)rpc;
                        MapRGet mGet = MapRConverter.toMapRGet(grpc, mTable, Bytes.toString(grpc.family()));
                        MapRResult result = mTable.get(mGet);
                        ArrayList<KeyValue> kvArr = MapRConverter.toAsyncHBaseResult(result, grpc.key(), mTable);
                        if (mGet.getArena() != 0L) {
                            mTable.freeArena(mGet.getArena());
                        }
                        grpc.callback(kvArr);
                    }
                    catch (IllegalArgumentException ie) {
                        LOG.error("Exception in async get(): " + ie.getMessage());
                        NoSuchColumnFamilyException e = new NoSuchColumnFamilyException("Invalid column family", rpc);
                        rpc.callback(e);
                        Deferred.fromError((Exception)e);
                    }
                    catch (Exception e) {
                        LOG.error("Exception in async get(): " + e.getMessage());
                        rpc.callback(e);
                        Deferred.fromError((Exception)e);
                        return;
                    }
                }
                if (rpc instanceof AtomicIncrementRequest) {
                    try {
                        AtomicIncrementRequest arpc = (AtomicIncrementRequest)rpc;
                        MapRIncrement mincr = MapRConverter.toMapRIncrement(arpc.key(), arpc.family(), arpc.qualifier(), arpc.getAmount(), mTable);
                        mTable.increment(mincr);
                        arpc.callback(mincr.newValues[0]);
                    }
                    catch (Exception e) {
                        LOG.error("Exception in async AtomicIncrementRequest: " + e.getMessage());
                        rpc.callback(e);
                        Deferred.fromError((Exception)e);
                        return;
                    }
                }
                if (rpc instanceof CompareAndSetRequest) {
                    try {
                        MapRPut mput;
                        String family;
                        int id = 0;
                        CompareAndSetRequest crpc = (CompareAndSetRequest)rpc;
                        boolean useCf = false;
                        if (crpc.family() != null && !(family = Bytes.toString(crpc.family())).isEmpty()) {
                            useCf = true;
                            try {
                                id = mTable.getFamilyId(family);
                            }
                            catch (IOException ioe) {
                                throw new IllegalArgumentException("Invalid column family " + family, ioe);
                            }
                        }
                        byte[][] qualifiers = new byte[][]{crpc.qualifier()};
                        byte[][] values = new byte[1][];
                        boolean res = false;
                        if (crpc.value() == null) {
                            values[0] = new byte[0];
                            mput = new MapRPut(crpc.key(), id, (byte[][])qualifiers, (byte[][])values, Long.MAX_VALUE, 24);
                            res = mTable.checkAndDelete(crpc.key(), useCf, id, crpc.qualifier(), crpc.expectedValue(), mput);
                        } else {
                            values[0] = crpc.value();
                            mput = new MapRPut(crpc.key(), id, (byte[][])qualifiers, (byte[][])values, Long.MAX_VALUE);
                            res = mTable.checkAndPut(crpc.key(), useCf, id, crpc.qualifier(), crpc.expectedValue(), mput);
                        }
                        crpc.callback(new Boolean(res));
                    }
                    catch (Exception e) {
                        LOG.error("Exception in async CompareAndSetRequest: " + e.getMessage());
                        rpc.callback(e);
                        Deferred.fromError((Exception)e);
                        return;
                    }
                }
                if (!(rpc instanceof DeleteRequest)) continue;
                try {
                    String family;
                    int id = 0;
                    DeleteRequest drpc = (DeleteRequest)rpc;
                    if (drpc.family() != null && !(family = Bytes.toString(drpc.family())).isEmpty()) {
                        try {
                            id = mTable.getFamilyId(family);
                        }
                        catch (IOException ioe) {
                            throw new IllegalArgumentException("Invalid column family " + family, ioe);
                        }
                    }
                    byte[][] values = new byte[][]{new byte[0]};
                    byte type = drpc.family() == DeleteRequest.WHOLE_ROW ? (byte)18 : 24;
                    MapRPut mput = type == 24 ? new MapRPut(drpc.key(), id, drpc.qualifiers(), (byte[][])values, drpc.timestamp(), type) : new MapRPut(drpc.key(), id, (byte[][])values, (byte[][])values, drpc.timestamp(), type);
                    mTable.delete(mput);
                    drpc.callback(null);
                }
                catch (Exception e) {
                    LOG.error("Exception in async CompareAndSetRequest: " + e.getMessage());
                    rpc.callback(e);
                    Deferred.fromError((Exception)e);
                    return;
                }
            }
        }
    }

    static class MapRScanPlus {
        MapRScan mscan;
        HBaseRpc dummyRpc;
        Scanner scan;
        MapRHTable mtbl;

        MapRScanPlus(MapRScan mscan, HBaseRpc dummyRpc, Scanner scan, MapRHTable mt) {
            this.mscan = mscan;
            this.dummyRpc = dummyRpc;
            this.scan = scan;
            this.mtbl = mt;
        }
    }

    static class AsyncHBaseRpc {
        HBaseRpc rpc;
        MapRHTable mTable;
        Deferred<?> deferred;
        Scanner scanner;

        public AsyncHBaseRpc(HBaseRpc rpc, MapRHTable mTable) {
            this.rpc = rpc;
            this.mTable = mTable;
            this.deferred = null;
            this.scanner = null;
        }

        public AsyncHBaseRpc(HBaseRpc rpc, MapRHTable mTable, Deferred<?> deferred) {
            this(rpc, mTable);
            this.deferred = deferred;
        }

        public AsyncHBaseRpc(HBaseRpc rpc, MapRHTable mTable, Deferred<?> deferred, Scanner scanner) {
            this(rpc, mTable, deferred);
            this.scanner = scanner;
        }
    }
}

