/*
 * Decompiled with CFR 0.152.
 */
package voldemort.server.niosocket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.RequestFormatType;
import voldemort.server.protocol.RequestHandler;
import voldemort.server.protocol.RequestHandlerFactory;
import voldemort.server.protocol.StreamRequestHandler;
import voldemort.utils.ByteBufferBackedInputStream;
import voldemort.utils.ByteBufferBackedOutputStream;
import voldemort.utils.ByteUtils;

public class AsyncRequestHandler
implements Runnable {
    private final Selector selector;
    private final SocketChannel socketChannel;
    private final RequestHandlerFactory requestHandlerFactory;
    private final int socketBufferSize;
    private final int resizeThreshold;
    private final ByteBufferBackedInputStream inputStream;
    private final ByteBufferBackedOutputStream outputStream;
    private RequestHandler requestHandler;
    private StreamRequestHandler streamRequestHandler;
    private final Logger logger = Logger.getLogger(this.getClass());

    public AsyncRequestHandler(Selector selector, SocketChannel socketChannel, RequestHandlerFactory requestHandlerFactory, int socketBufferSize) {
        this.selector = selector;
        this.socketChannel = socketChannel;
        this.requestHandlerFactory = requestHandlerFactory;
        this.socketBufferSize = socketBufferSize;
        this.resizeThreshold = socketBufferSize * 2;
        this.inputStream = new ByteBufferBackedInputStream(ByteBuffer.allocate(socketBufferSize));
        this.outputStream = new ByteBufferBackedOutputStream(ByteBuffer.allocate(socketBufferSize));
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Accepting remote connection from " + socketChannel.socket().getRemoteSocketAddress());
        }
    }

    public void run() {
        block9: {
            SelectionKey selectionKey = this.socketChannel.keyFor(this.selector);
            try {
                if (selectionKey.isReadable()) {
                    this.read(selectionKey);
                    break block9;
                }
                if (selectionKey.isWritable()) {
                    this.write(selectionKey);
                    break block9;
                }
                if (!selectionKey.isValid()) {
                    throw new IllegalStateException("Selection key not valid for " + this.socketChannel.socket().getRemoteSocketAddress());
                }
                throw new IllegalStateException("Unknown state, not readable, writable, or valid for " + this.socketChannel.socket().getRemoteSocketAddress());
            }
            catch (ClosedByInterruptException e) {
                this.close(selectionKey);
            }
            catch (CancelledKeyException e) {
                this.close(selectionKey);
            }
            catch (EOFException e) {
                this.close(selectionKey);
            }
            catch (Throwable t) {
                if (this.logger.isEnabledFor(Level.ERROR)) {
                    this.logger.error(t.getMessage(), t);
                }
                this.close(selectionKey);
            }
        }
    }

    private void read(SelectionKey selectionKey) throws IOException {
        int count = 0;
        count = this.socketChannel.read(this.inputStream.getBuffer());
        if (count == -1) {
            throw new EOFException("EOF for " + this.socketChannel.socket().getRemoteSocketAddress());
        }
        if (this.logger.isTraceEnabled()) {
            this.traceInputBufferState("Read " + count + " bytes");
        }
        if (count == 0) {
            return;
        }
        int position = this.inputStream.getBuffer().position();
        this.inputStream.getBuffer().flip();
        if (this.requestHandler == null && !this.initRequestHandler(selectionKey)) {
            return;
        }
        if (this.streamRequestHandler != null) {
            this.handleStreamRequest(selectionKey);
            return;
        }
        if (!this.requestHandler.isCompleteRequest(this.inputStream.getBuffer())) {
            this.handleIncompleteRequest(position);
            return;
        }
        this.inputStream.getBuffer().rewind();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Starting execution for " + this.socketChannel.socket().getRemoteSocketAddress());
        }
        this.streamRequestHandler = this.requestHandler.handleRequest(new DataInputStream(this.inputStream), new DataOutputStream(this.outputStream));
        if (this.streamRequestHandler != null) {
            this.handleStreamRequest(selectionKey);
            return;
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Finished execution for " + this.socketChannel.socket().getRemoteSocketAddress());
        }
        this.prepForWrite(selectionKey);
    }

    private void write(SelectionKey selectionKey) throws IOException {
        if (this.outputStream.getBuffer().hasRemaining()) {
            int count = this.socketChannel.write(this.outputStream.getBuffer());
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Wrote " + count + " bytes, remaining: " + this.outputStream.getBuffer().remaining() + " for " + this.socketChannel.socket().getRemoteSocketAddress());
            }
        } else if (this.logger.isTraceEnabled()) {
            this.logger.trace("Wrote no bytes for " + this.socketChannel.socket().getRemoteSocketAddress());
        }
        if (this.outputStream.getBuffer().hasRemaining()) {
            return;
        }
        if (this.outputStream.getBuffer().capacity() >= this.resizeThreshold) {
            this.outputStream.setBuffer(ByteBuffer.allocate(this.socketBufferSize));
        } else {
            this.outputStream.getBuffer().clear();
        }
        if (this.streamRequestHandler != null && this.streamRequestHandler.getDirection() == StreamRequestHandler.StreamRequestDirection.WRITING) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Request is streaming for " + this.socketChannel.socket().getRemoteSocketAddress());
            }
            this.handleStreamRequest(selectionKey);
        } else {
            selectionKey.interestOps(1);
        }
    }

    private void handleStreamRequest(SelectionKey selectionKey) throws IOException {
        DataInputStream dataInputStream = new DataInputStream(this.inputStream);
        DataOutputStream dataOutputStream = new DataOutputStream(this.outputStream);
        int preRequestPosition = this.inputStream.getBuffer().position();
        StreamRequestHandler.StreamRequestHandlerState state = this.handleStreamRequestInternal(selectionKey, dataInputStream, dataOutputStream);
        if (state == StreamRequestHandler.StreamRequestHandlerState.READING) {
            do {
                preRequestPosition = this.inputStream.getBuffer().position();
            } while ((state = this.handleStreamRequestInternal(selectionKey, dataInputStream, dataOutputStream)) == StreamRequestHandler.StreamRequestHandlerState.READING);
        } else if (state == StreamRequestHandler.StreamRequestHandlerState.WRITING) {
            while ((state = this.handleStreamRequestInternal(selectionKey, dataInputStream, dataOutputStream)) == StreamRequestHandler.StreamRequestHandlerState.WRITING && !this.outputStream.wasExpanded()) {
            }
            if (state != StreamRequestHandler.StreamRequestHandlerState.COMPLETE) {
                this.prepForWrite(selectionKey);
            }
        }
        if (state == null) {
            return;
        }
        if (state == StreamRequestHandler.StreamRequestHandlerState.INCOMPLETE_READ) {
            int currentPosition = this.inputStream.getBuffer().position();
            this.inputStream.getBuffer().position(preRequestPosition);
            this.inputStream.getBuffer().compact();
            this.handleIncompleteRequest(currentPosition - preRequestPosition);
        } else if (state == StreamRequestHandler.StreamRequestHandlerState.COMPLETE) {
            this.streamRequestHandler.close(dataOutputStream);
            this.streamRequestHandler = null;
            this.prepForWrite(selectionKey);
        }
    }

    private StreamRequestHandler.StreamRequestHandlerState handleStreamRequestInternal(SelectionKey selectionKey, DataInputStream dataInputStream, DataOutputStream dataOutputStream) throws IOException {
        StreamRequestHandler.StreamRequestHandlerState state = null;
        try {
            if (this.logger.isTraceEnabled()) {
                this.traceInputBufferState("Before streaming request handler");
            }
            state = this.streamRequestHandler.handleRequest(dataInputStream, dataOutputStream);
            if (this.logger.isTraceEnabled()) {
                this.traceInputBufferState("After streaming request handler");
            }
        }
        catch (Exception e) {
            if (this.logger.isEnabledFor(Level.WARN)) {
                this.logger.warn(e.getMessage(), e);
            }
            VoldemortException error = e instanceof VoldemortException ? (VoldemortException)e : new VoldemortException(e);
            this.streamRequestHandler.handleError(dataOutputStream, error);
            this.streamRequestHandler.close(dataOutputStream);
            this.streamRequestHandler = null;
            this.prepForWrite(selectionKey);
            this.close(selectionKey);
        }
        return state;
    }

    private void prepForWrite(SelectionKey selectionKey) {
        if (this.logger.isTraceEnabled()) {
            this.traceInputBufferState("About to clear read buffer");
        }
        if (this.inputStream.getBuffer().capacity() >= this.resizeThreshold) {
            this.inputStream.setBuffer(ByteBuffer.allocate(this.socketBufferSize));
        } else {
            this.inputStream.getBuffer().clear();
        }
        if (this.logger.isTraceEnabled()) {
            this.traceInputBufferState("Cleared read buffer");
        }
        this.outputStream.getBuffer().flip();
        selectionKey.interestOps(4);
    }

    private void handleIncompleteRequest(int newPosition) {
        if (this.logger.isTraceEnabled()) {
            this.traceInputBufferState("Incomplete read request detected, before update");
        }
        this.inputStream.getBuffer().position(newPosition);
        this.inputStream.getBuffer().limit(this.inputStream.getBuffer().capacity());
        if (this.logger.isTraceEnabled()) {
            this.traceInputBufferState("Incomplete read request detected, after update");
        }
        if (!this.inputStream.getBuffer().hasRemaining()) {
            this.inputStream.setBuffer(ByteUtils.expand(this.inputStream.getBuffer(), this.inputStream.getBuffer().capacity() * 2));
            if (this.logger.isTraceEnabled()) {
                this.traceInputBufferState("Expanded input buffer");
            }
        }
    }

    private void close(SelectionKey selectionKey) {
        block9: {
            block8: {
                block7: {
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info("Closing remote connection from " + this.socketChannel.socket().getRemoteSocketAddress());
                    }
                    try {
                        this.socketChannel.socket().close();
                    }
                    catch (IOException e) {
                        if (!this.logger.isEnabledFor(Level.WARN)) break block7;
                        this.logger.warn(e.getMessage(), e);
                    }
                }
                try {
                    this.socketChannel.close();
                }
                catch (IOException e) {
                    if (!this.logger.isEnabledFor(Level.WARN)) break block8;
                    this.logger.warn(e.getMessage(), e);
                }
            }
            try {
                selectionKey.attach(null);
                selectionKey.cancel();
            }
            catch (Exception e) {
                if (!this.logger.isEnabledFor(Level.WARN)) break block9;
                this.logger.warn(e.getMessage(), e);
            }
        }
    }

    private boolean initRequestHandler(SelectionKey selectionKey) {
        ByteBuffer inputBuffer = this.inputStream.getBuffer();
        int remaining = inputBuffer.remaining();
        if (remaining < 3) {
            return true;
        }
        byte[] protoBytes = new byte[]{inputBuffer.get(0), inputBuffer.get(1), inputBuffer.get(2)};
        try {
            String proto = ByteUtils.getString(protoBytes, "UTF-8");
            RequestFormatType requestFormatType = RequestFormatType.fromCode(proto);
            this.requestHandler = this.requestHandlerFactory.getRequestHandler(requestFormatType);
            if (this.logger.isInfoEnabled()) {
                this.logger.info("Protocol negotiated for " + this.socketChannel.socket().getRemoteSocketAddress() + ": " + requestFormatType.getDisplayName());
            }
            this.outputStream.getBuffer().put(ByteUtils.getBytes("ok", "UTF-8"));
            this.prepForWrite(selectionKey);
            return false;
        }
        catch (IllegalArgumentException e) {
            RequestFormatType requestFormatType = RequestFormatType.VOLDEMORT_V0;
            this.requestHandler = this.requestHandlerFactory.getRequestHandler(requestFormatType);
            if (this.logger.isInfoEnabled()) {
                this.logger.info("No protocol proposal given for " + this.socketChannel.socket().getRemoteSocketAddress() + ", assuming " + requestFormatType.getDisplayName());
            }
            return true;
        }
    }

    private void traceInputBufferState(String preamble) {
        this.logger.trace(preamble + " - position: " + this.inputStream.getBuffer().position() + ", limit: " + this.inputStream.getBuffer().limit() + ", remaining: " + this.inputStream.getBuffer().remaining() + ", capacity: " + this.inputStream.getBuffer().capacity() + " - for " + this.socketChannel.socket().getRemoteSocketAddress());
    }
}

