/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.proto;

import bk-shade.com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.ServerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NIOServerFactory
extends Thread {
    ServerStats stats = new ServerStats();
    Logger LOG = LoggerFactory.getLogger(NIOServerFactory.class);
    ServerSocketChannel ss;
    Selector selector = Selector.open();
    ByteBuffer directBuffer = ByteBuffer.allocateDirect(65536);
    HashSet<Cnxn> cnxns = new HashSet();
    int outstandingLimit = 2000;
    PacketProcessor processor;
    long minLatency = 99999999L;
    ServerConfiguration conf;
    private AtomicBoolean crashed = new AtomicBoolean(false);
    private Object suspensionLock = new Object();
    private boolean suspended = false;
    static final ByteBuffer closeConn = ByteBuffer.allocate(0);

    public NIOServerFactory(ServerConfiguration conf, PacketProcessor processor) throws IOException {
        super("NIOServerFactory-" + conf.getBookiePort());
        this.setDaemon(true);
        this.processor = processor;
        this.conf = conf;
        this.ss = ServerSocketChannel.open();
        if (conf.getListeningInterface() == null) {
            this.ss.socket().bind(new InetSocketAddress(conf.getBookiePort()));
        } else {
            this.ss.socket().bind(Bookie.getBookieAddress(conf));
        }
        this.ss.configureBlocking(false);
        this.ss.register(this.selector, 16);
    }

    public InetSocketAddress getLocalAddress() {
        return (InetSocketAddress)this.ss.socket().getLocalSocketAddress();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addCnxn(Cnxn cnxn) {
        HashSet<Cnxn> hashSet = this.cnxns;
        synchronized (hashSet) {
            this.cnxns.add(cnxn);
        }
    }

    public boolean isRunning() {
        return !this.ss.socket().isClosed() && this.isAlive();
    }

    boolean hasCrashed() {
        return this.crashed.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void suspendProcessing() {
        Object object = this.suspensionLock;
        synchronized (object) {
            this.suspended = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void resumeProcessing() {
        Object object = this.suspensionLock;
        synchronized (object) {
            this.suspended = false;
            this.suspensionLock.notify();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (!this.ss.socket().isClosed()) {
            try {
                Set<SelectionKey> selected;
                this.selector.select(1000L);
                Object object = this.suspensionLock;
                synchronized (object) {
                    while (this.suspended) {
                        this.suspensionLock.wait();
                    }
                }
                NIOServerFactory nIOServerFactory = this;
                synchronized (nIOServerFactory) {
                    selected = this.selector.selectedKeys();
                }
                ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
                Collections.shuffle(selectedList);
                for (SelectionKey k : selectedList) {
                    if ((k.readyOps() & 0x10) != 0) {
                        SocketChannel sc = ((ServerSocketChannel)k.channel()).accept();
                        sc.configureBlocking(false);
                        SelectionKey sk = sc.register(this.selector, 1);
                        Cnxn cnxn = new Cnxn(sc, sk);
                        sk.attach(cnxn);
                        this.addCnxn(cnxn);
                        continue;
                    }
                    if ((k.readyOps() & 5) == 0) continue;
                    Cnxn c = (Cnxn)k.attachment();
                    c.doIO(k);
                }
                selected.clear();
            }
            catch (Exception e) {
                this.LOG.warn("Exception in server socket loop: " + this.ss.socket().getInetAddress(), (Throwable)e);
            }
            catch (Throwable e) {
                this.LOG.error("Error in server socket loop: " + this.ss.socket().getInetAddress(), e);
                this.crashed.set(true);
                break;
            }
        }
        this.LOG.info("NIOServerCnxn factory exitedloop.");
        this.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void clear() {
        this.selector.wakeup();
        HashSet<Cnxn> hashSet = this.cnxns;
        synchronized (hashSet) {
            Iterator<Cnxn> it = this.cnxns.iterator();
            while (it.hasNext()) {
                Cnxn cnxn = it.next();
                it.remove();
                try {
                    cnxn.close();
                }
                catch (Exception e) {}
            }
        }
    }

    public void shutdown() {
        try {
            this.ss.close();
            this.clear();
            this.interrupt();
            this.join();
        }
        catch (InterruptedException e) {
            this.LOG.warn("Interrupted", (Throwable)e);
        }
        catch (Exception e) {
            this.LOG.error("Unexpected exception", (Throwable)e);
        }
    }

    public class Cnxn {
        private SocketChannel sock;
        private SelectionKey sk;
        boolean initialized;
        ByteBuffer lenBuffer;
        ByteBuffer incomingBuffer;
        LinkedBlockingQueue<ByteBuffer> outgoingBuffers;
        int sessionTimeout;
        int outstandingRequests;
        String peerName;
        boolean closed;
        private CnxnStats cnxnStats;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void doIO(SelectionKey k) throws InterruptedException {
            block25: {
                try {
                    if (this.sock == null) {
                        return;
                    }
                    if (k.isReadable()) {
                        int rc = this.sock.read(this.incomingBuffer);
                        if (rc < 0) {
                            NIOServerFactory.this.LOG.info("Peer closed connection. rc={} {}", (Object)rc, (Object)this.sock);
                            this.close();
                            return;
                        }
                        if (this.incomingBuffer.remaining() == 0) {
                            this.incomingBuffer.flip();
                            if (this.incomingBuffer == this.lenBuffer) {
                                this.readLength(k);
                            } else {
                                ++this.cnxnStats.packetsReceived;
                                ServerStats.getInstance().incrementPacketsReceived();
                                try {
                                    this.readRequest();
                                }
                                finally {
                                    this.lenBuffer.clear();
                                    this.incomingBuffer = this.lenBuffer;
                                }
                            }
                        }
                    }
                    if (!k.isWritable()) break block25;
                    if (this.outgoingBuffers.size() > 0) {
                        NIOServerFactory.this.directBuffer.clear();
                        for (ByteBuffer b : this.outgoingBuffers) {
                            if (NIOServerFactory.this.directBuffer.remaining() < b.remaining()) {
                                b = (ByteBuffer)b.slice().limit(NIOServerFactory.this.directBuffer.remaining());
                            }
                            int p = b.position();
                            NIOServerFactory.this.directBuffer.put(b);
                            b.position(p);
                            if (NIOServerFactory.this.directBuffer.remaining() != 0) continue;
                            break;
                        }
                        NIOServerFactory.this.directBuffer.flip();
                        int sent = this.sock.write(NIOServerFactory.this.directBuffer);
                        while (this.outgoingBuffers.size() > 0) {
                            ByteBuffer bb = this.outgoingBuffers.peek();
                            if (bb == closeConn) {
                                throw new IOException("closing");
                            }
                            int left = bb.remaining() - sent;
                            if (left > 0) {
                                bb.position(bb.position() + sent);
                                break;
                            }
                            ++this.cnxnStats.packetsSent;
                            sent -= bb.remaining();
                            ServerStats.getInstance().incrementPacketsSent();
                            this.outgoingBuffers.remove();
                        }
                    }
                    Cnxn sent = this;
                    synchronized (sent) {
                        if (this.outgoingBuffers.size() == 0) {
                            if (!this.initialized && (this.sk.interestOps() & 1) == 0) {
                                throw new IOException("Responded to info probe");
                            }
                            this.sk.interestOps(this.sk.interestOps() & 0xFFFFFFFB);
                        } else {
                            this.sk.interestOps(this.sk.interestOps() | 4);
                        }
                    }
                }
                catch (CancelledKeyException e) {
                    this.close();
                }
                catch (IOException e) {
                    this.close();
                }
            }
        }

        private void readRequest() throws IOException {
            this.incomingBuffer = this.incomingBuffer.slice();
            NIOServerFactory.this.processor.processPacket(this.incomingBuffer, this);
        }

        public void disableRecv() {
            this.sk.interestOps(this.sk.interestOps() & 0xFFFFFFFE);
        }

        public void enableRecv() {
            int interest;
            if (this.sk.isValid() && ((interest = this.sk.interestOps()) & 1) == 0) {
                this.sk.interestOps(interest | 1);
            }
        }

        private void readLength(SelectionKey k) throws IOException {
            int len = this.lenBuffer.getInt();
            if (len < 0 || len > 1048575) {
                throw new IOException("Len error " + len);
            }
            this.incomingBuffer = ByteBuffer.allocate(len);
        }

        public int getSessionTimeout() {
            return this.sessionTimeout;
        }

        public Cnxn(SocketChannel sock, SelectionKey sk) throws IOException {
            this.incomingBuffer = this.lenBuffer = ByteBuffer.allocate(4);
            this.outgoingBuffers = new LinkedBlockingQueue();
            this.peerName = null;
            this.cnxnStats = new CnxnStats();
            this.sock = sock;
            this.sk = sk;
            sock.socket().setTcpNoDelay(NIOServerFactory.this.conf.getServerTcpNoDelay());
            sock.socket().setSoLinger(true, 2);
            sk.interestOps(1);
            if (NIOServerFactory.this.LOG.isTraceEnabled()) {
                this.peerName = sock.socket().toString();
            }
            this.lenBuffer.clear();
            this.incomingBuffer = this.lenBuffer;
        }

        public String toString() {
            return "NIOServerCnxn object with sock = " + this.sock + " and sk = " + this.sk;
        }

        public String getPeerName() {
            if (this.peerName == null) {
                this.peerName = this.sock.socket().toString();
            }
            return this.peerName;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            HashSet<Cnxn> hashSet = NIOServerFactory.this.cnxns;
            synchronized (hashSet) {
                NIOServerFactory.this.cnxns.remove(this);
            }
            NIOServerFactory.this.LOG.debug("close NIOServerCnxn: {}", (Object)this.sock);
            try {
                this.sock.socket().shutdownOutput();
            }
            catch (IOException e) {
                // empty catch block
            }
            try {
                this.sock.socket().shutdownInput();
            }
            catch (IOException e) {
                // empty catch block
            }
            try {
                this.sock.socket().close();
            }
            catch (IOException e) {
                NIOServerFactory.this.LOG.error("FIXMSG", (Throwable)e);
            }
            try {
                this.sock.close();
            }
            catch (IOException e) {
                NIOServerFactory.this.LOG.error("FIXMSG", (Throwable)e);
            }
            this.sock = null;
            if (this.sk != null) {
                try {
                    this.sk.cancel();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }

        private void makeWritable(SelectionKey sk) {
            try {
                NIOServerFactory.this.selector.wakeup();
                if (sk.isValid()) {
                    sk.interestOps(sk.interestOps() | 4);
                }
            }
            catch (RuntimeException e) {
                NIOServerFactory.this.LOG.error("Problem setting writable", (Throwable)e);
                throw e;
            }
        }

        private void sendBuffers(ByteBuffer[] bb) {
            int i;
            ByteBuffer len = ByteBuffer.allocate(4);
            int total = 0;
            for (i = 0; i < bb.length; ++i) {
                if (bb[i] == null) continue;
                total += bb[i].remaining();
            }
            NIOServerFactory.this.LOG.debug("Sending response of size {} to {}", (Object)total, (Object)this.peerName);
            len.putInt(total);
            len.flip();
            this.outgoingBuffers.add(len);
            for (i = 0; i < bb.length; ++i) {
                if (bb[i] == null) continue;
                this.outgoingBuffers.add(bb[i]);
            }
            this.makeWritable(this.sk);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void sendResponse(ByteBuffer ... bb) {
            Object object = this;
            synchronized (object) {
                if (this.closed) {
                    return;
                }
                this.sendBuffers(bb);
                --this.outstandingRequests;
            }
            object = NIOServerFactory.this;
            synchronized (object) {
                Cnxn cnxn = this;
                synchronized (cnxn) {
                    if (this.outstandingRequests < NIOServerFactory.this.outstandingLimit) {
                        this.sk.selector().wakeup();
                        this.enableRecv();
                    }
                }
            }
        }

        public InetSocketAddress getRemoteAddress() {
            return (InetSocketAddress)this.sock.socket().getRemoteSocketAddress();
        }

        public CnxnStats getStats() {
            return this.cnxnStats;
        }

        private class CnxnStats {
            long packetsSent = 0L;
            long packetsReceived = 0L;

            private CnxnStats() {
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public long getOutstandingRequests() {
                Cnxn cnxn = Cnxn.this;
                synchronized (cnxn) {
                    return Cnxn.this.outstandingRequests;
                }
            }

            public long getPacketsReceived() {
                return this.packetsReceived;
            }

            public long getPacketsSent() {
                return this.packetsSent;
            }

            public String toString() {
                StringBuilder sb = new StringBuilder();
                SelectableChannel channel = Cnxn.this.sk.channel();
                if (channel instanceof SocketChannel) {
                    sb.append(" ").append(((SocketChannel)channel).socket().getRemoteSocketAddress()).append("[").append(Integer.toHexString(Cnxn.this.sk.interestOps())).append("](queued=").append(this.getOutstandingRequests()).append(",recved=").append(this.getPacketsReceived()).append(",sent=").append(this.getPacketsSent()).append(")\n");
                }
                return sb.toString();
            }
        }
    }

    public static interface PacketProcessor {
        public void processPacket(ByteBuffer var1, Cnxn var2);
    }
}

