/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport.netty;

import java.io.IOException;
import java.io.StreamCorruptedException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.HandlesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.buffer.ChannelBuffers;
import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
import org.elasticsearch.common.netty.channel.ChannelStateEvent;
import org.elasticsearch.common.netty.channel.ExceptionEvent;
import org.elasticsearch.common.netty.channel.MessageEvent;
import org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler;
import org.elasticsearch.common.netty.channel.WriteCompletionEvent;
import org.elasticsearch.common.netty.handler.codec.frame.TooLongFrameException;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.ResponseHandlerFailureTransportException;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportSerializationException;
import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
import org.elasticsearch.transport.netty.NettyTransport;
import org.elasticsearch.transport.netty.NettyTransportChannel;
import org.elasticsearch.transport.support.TransportStreams;

public class MessageChannelHandler
extends SimpleChannelUpstreamHandler {
    private final ESLogger logger;
    private final ThreadPool threadPool;
    private final TransportServiceAdapter transportServiceAdapter;
    private final NettyTransport transport;
    private ChannelBuffer cumulation;
    private static final long NINETY_PER_HEAP_SIZE = (long)((double)JvmInfo.jvmInfo().mem().heapMax().bytes() * 0.9);

    public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
        this.threadPool = transport.threadPool();
        this.transportServiceAdapter = transport.transportServiceAdapter();
        this.transport = transport;
        this.logger = logger;
    }

    @Override
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        this.transportServiceAdapter.sent(e.getWrittenAmount());
        super.writeComplete(ctx, e);
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Object m = e.getMessage();
        if (!(m instanceof ChannelBuffer)) {
            ctx.sendUpstream(e);
            return;
        }
        ChannelBuffer input = (ChannelBuffer)m;
        if (!input.readable()) {
            return;
        }
        ChannelBuffer cumulation = this.cumulation;
        if (cumulation != null && cumulation.readable()) {
            cumulation.discardReadBytes();
            cumulation.writeBytes(input);
            this.callDecode(ctx, e.getChannel(), cumulation, true);
        } else {
            int actualSize = this.callDecode(ctx, e.getChannel(), input, false);
            if (input.readable()) {
                cumulation = actualSize > 0 ? ChannelBuffers.dynamicBuffer(actualSize, ctx.getChannel().getConfig().getBufferFactory()) : ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
                cumulation.writeBytes(input);
                this.cumulation = cumulation;
            }
        }
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.cleanup(ctx, e);
    }

    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.cleanup(ctx, e);
    }

    private int callDecode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, boolean cumulationBuffer) throws Exception {
        int actualSize = 0;
        while (buffer.readable()) {
            actualSize = 0;
            if (buffer.readableBytes() < 4) break;
            int dataLen = buffer.getInt(buffer.readerIndex());
            if (dataLen <= 0) {
                throw new StreamCorruptedException("invalid data length: " + dataLen);
            }
            if ((long)dataLen > NINETY_PER_HEAP_SIZE) {
                throw new TooLongFrameException("transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
            }
            actualSize = dataLen + 4;
            if (buffer.readableBytes() < actualSize) break;
            buffer.skipBytes(4);
            this.process(ctx, channel, buffer, dataLen);
        }
        if (cumulationBuffer) {
            if (!buffer.readable()) {
                this.cumulation = null;
            } else if (buffer.readerIndex() > 0) {
                this.cumulation = actualSize > 0 ? ChannelBuffers.dynamicBuffer(actualSize, ctx.getChannel().getConfig().getBufferFactory()) : ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
                this.cumulation.writeBytes(buffer);
            }
        }
        return actualSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        try {
            ChannelBuffer cumulation = this.cumulation;
            if (cumulation == null) {
                return;
            }
            this.cumulation = null;
            if (cumulation.readable()) {
                this.callDecode(ctx, ctx.getChannel(), cumulation, true);
            }
        }
        finally {
            ctx.sendUpstream(e);
        }
    }

    private void process(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int size) throws Exception {
        HandlesStreamInput wrappedStream;
        boolean hasBytesToRead;
        this.transportServiceAdapter.received(size + 4);
        int markedReaderIndex = buffer.readerIndex();
        int expectedIndexReader = markedReaderIndex + size;
        StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
        long requestId = buffer.readLong();
        byte status = buffer.readByte();
        boolean isRequest = TransportStreams.statusIsRequest(status);
        boolean bl = hasBytesToRead = size - 9 != 0;
        if (TransportStreams.statusIsCompress(status) && hasBytesToRead && buffer.readable()) {
            Compressor compressor = CompressorFactory.compressor(buffer);
            if (compressor == null) {
                int maxToRead = Math.min(buffer.readableBytes(), 10);
                int offset = buffer.readerIndex();
                StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
                for (int i = 0; i < maxToRead; ++i) {
                    sb.append(buffer.getByte(offset + i)).append(",");
                }
                sb.append("]");
                throw new ElasticSearchIllegalStateException(sb.toString());
            }
            wrappedStream = CachedStreamInput.cachedHandlesCompressed(compressor, streamIn);
        } else {
            wrappedStream = CachedStreamInput.cachedHandles(streamIn);
        }
        if (isRequest) {
            String action = this.handleRequest(channel, wrappedStream, requestId);
            if (buffer.readerIndex() != expectedIndexReader) {
                if (buffer.readerIndex() < expectedIndexReader) {
                    this.logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
                } else {
                    this.logger.warn("Message read past expected size (request) for [{}] and action [{}], resetting", requestId, action);
                }
                buffer.readerIndex(expectedIndexReader);
            }
        } else {
            TransportResponseHandler handler = this.transportServiceAdapter.remove(requestId);
            if (handler != null) {
                if (TransportStreams.statusIsError(status)) {
                    this.handlerResponseError(wrappedStream, handler);
                } else {
                    this.handleResponse(wrappedStream, handler);
                }
            } else {
                buffer.readerIndex(markedReaderIndex + size);
            }
            if (buffer.readerIndex() != expectedIndexReader) {
                if (buffer.readerIndex() < expectedIndexReader) {
                    this.logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
                } else {
                    this.logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStreams.statusIsError(status));
                }
                buffer.readerIndex(expectedIndexReader);
            }
        }
        ((StreamInput)wrappedStream).close();
    }

    private void handleResponse(StreamInput buffer, TransportResponseHandler handler) {
        Object streamable = handler.newInstance();
        try {
            streamable.readFrom(buffer);
        }
        catch (Exception e) {
            this.handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e));
            return;
        }
        try {
            if (handler.executor() == "same") {
                handler.handleResponse(streamable);
            } else {
                this.threadPool.executor(handler.executor()).execute(new ResponseHandler(handler, (Streamable)streamable));
            }
        }
        catch (Exception e) {
            this.handleException(handler, new ResponseHandlerFailureTransportException(e));
        }
    }

    private void handlerResponseError(StreamInput buffer, TransportResponseHandler handler) {
        Throwable error;
        try {
            ThrowableObjectInputStream ois = new ThrowableObjectInputStream(buffer, this.transport.settings().getClassLoader());
            error = (Throwable)ois.readObject();
        }
        catch (Exception e) {
            error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
        }
        this.handleException(handler, error);
    }

    private void handleException(final TransportResponseHandler handler, Throwable error) {
        if (!(error instanceof RemoteTransportException)) {
            error = new RemoteTransportException(error.getMessage(), error);
        }
        final RemoteTransportException rtx = (RemoteTransportException)error;
        if (handler.executor() == "same") {
            handler.handleException(rtx);
        } else {
            this.threadPool.executor(handler.executor()).execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        handler.handleException(rtx);
                    }
                    catch (Exception e) {
                        MessageChannelHandler.this.logger.error("Failed to handle exception response", e, new Object[0]);
                    }
                }
            });
        }
    }

    private String handleRequest(Channel channel, StreamInput buffer, long requestId) throws IOException {
        String action = buffer.readUTF();
        NettyTransportChannel transportChannel = new NettyTransportChannel(this.transport, action, channel, requestId);
        try {
            TransportRequestHandler handler = this.transportServiceAdapter.handler(action);
            if (handler == null) {
                throw new ActionNotFoundTransportException(action);
            }
            Object streamable = handler.newInstance();
            streamable.readFrom(buffer);
            if (handler.executor() == "same") {
                handler.messageReceived(streamable, transportChannel);
            } else {
                this.threadPool.executor(handler.executor()).execute(new RequestHandler(handler, (Streamable)streamable, transportChannel, action));
            }
        }
        catch (Exception e) {
            try {
                transportChannel.sendResponse(e);
            }
            catch (IOException e1) {
                this.logger.warn("Failed to send error message back to client for action [" + action + "]", e, new Object[0]);
                this.logger.warn("Actual Exception", e1, new Object[0]);
            }
        }
        return action;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        this.transport.exceptionCaught(ctx, e);
    }

    class RequestHandler
    implements Runnable {
        private final TransportRequestHandler handler;
        private final Streamable streamable;
        private final NettyTransportChannel transportChannel;
        private final String action;

        public RequestHandler(TransportRequestHandler handler, Streamable streamable, NettyTransportChannel transportChannel, String action) {
            this.handler = handler;
            this.streamable = streamable;
            this.transportChannel = transportChannel;
            this.action = action;
        }

        @Override
        public void run() {
            block4: {
                try {
                    this.handler.messageReceived(this.streamable, this.transportChannel);
                }
                catch (Throwable e) {
                    if (MessageChannelHandler.this.transport.lifecycleState() != Lifecycle.State.STARTED) break block4;
                    try {
                        this.transportChannel.sendResponse(e);
                    }
                    catch (IOException e1) {
                        MessageChannelHandler.this.logger.warn("Failed to send error message back to client for action [" + this.action + "]", e1, new Object[0]);
                        MessageChannelHandler.this.logger.warn("Actual Exception", e, new Object[0]);
                    }
                }
            }
        }
    }

    class ResponseHandler
    implements Runnable {
        private final TransportResponseHandler handler;
        private final Streamable streamable;

        public ResponseHandler(TransportResponseHandler handler, Streamable streamable) {
            this.handler = handler;
            this.streamable = streamable;
        }

        @Override
        public void run() {
            try {
                this.handler.handleResponse(this.streamable);
            }
            catch (Exception e) {
                MessageChannelHandler.this.handleException(this.handler, new ResponseHandlerFailureTransportException(e));
            }
        }
    }
}

