/*
 * Decompiled with CFR 0.152.
 */
package hidden.bkjournal.org.apache.bookkeeper.proto;

import hidden.bkjournal.org.apache.bookkeeper.conf.ClientConfiguration;
import hidden.bkjournal.org.apache.bookkeeper.proto.BookieProtocol;
import hidden.bkjournal.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import hidden.bkjournal.org.apache.bookkeeper.util.OrderedSafeExecutor;
import hidden.bkjournal.org.apache.bookkeeper.util.SafeRunnable;
import hidden.bkjournal.org.jboss.netty.bootstrap.ClientBootstrap;
import hidden.bkjournal.org.jboss.netty.buffer.ChannelBuffer;
import hidden.bkjournal.org.jboss.netty.buffer.ChannelBuffers;
import hidden.bkjournal.org.jboss.netty.channel.Channel;
import hidden.bkjournal.org.jboss.netty.channel.ChannelFuture;
import hidden.bkjournal.org.jboss.netty.channel.ChannelFutureListener;
import hidden.bkjournal.org.jboss.netty.channel.ChannelHandlerContext;
import hidden.bkjournal.org.jboss.netty.channel.ChannelPipeline;
import hidden.bkjournal.org.jboss.netty.channel.ChannelPipelineCoverage;
import hidden.bkjournal.org.jboss.netty.channel.ChannelPipelineFactory;
import hidden.bkjournal.org.jboss.netty.channel.ChannelStateEvent;
import hidden.bkjournal.org.jboss.netty.channel.Channels;
import hidden.bkjournal.org.jboss.netty.channel.ExceptionEvent;
import hidden.bkjournal.org.jboss.netty.channel.MessageEvent;
import hidden.bkjournal.org.jboss.netty.channel.SimpleChannelHandler;
import hidden.bkjournal.org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import hidden.bkjournal.org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import hidden.bkjournal.org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import hidden.bkjournal.org.jboss.netty.handler.codec.frame.TooLongFrameException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelPipelineCoverage(value="one")
public class PerChannelBookieClient
extends SimpleChannelHandler
implements ChannelPipelineFactory {
    static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
    static final long maxMemory = Runtime.getRuntime().maxMemory() / 5L;
    public static int MAX_FRAME_LENGTH = 0x200000;
    InetSocketAddress addr;
    Semaphore opCounterSem = new Semaphore(2000);
    AtomicLong totalBytesOutstanding;
    ClientSocketChannelFactory channelFactory;
    OrderedSafeExecutor executor;
    ConcurrentHashMap<CompletionKey, AddCompletion> addCompletions = new ConcurrentHashMap();
    ConcurrentHashMap<CompletionKey, ReadCompletion> readCompletions = new ConcurrentHashMap();
    Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<Void>>();
    Channel channel = null;
    private ConnectionState state;
    private final ClientConfiguration conf;

    public PerChannelBookieClient(OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
        this(new ClientConfiguration(), executor, channelFactory, addr, totalBytesOutstanding);
    }

    public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, ClientSocketChannelFactory channelFactory, InetSocketAddress addr, AtomicLong totalBytesOutstanding) {
        this.conf = conf;
        this.addr = addr;
        this.executor = executor;
        this.totalBytesOutstanding = totalBytesOutstanding;
        this.channelFactory = channelFactory;
        this.state = ConnectionState.DISCONNECTED;
    }

    private synchronized void connect() {
        if (this.state == ConnectionState.CONNECTING) {
            return;
        }
        this.state = ConnectionState.CONNECTING;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Connecting to bookie: " + this.addr);
        }
        ClientBootstrap bootstrap = new ClientBootstrap(this.channelFactory);
        bootstrap.setPipelineFactory(this);
        bootstrap.setOption("tcpNoDelay", this.conf.getClientTcpNoDelay());
        bootstrap.setOption("keepAlive", true);
        ChannelFuture future = bootstrap.connect(this.addr);
        future.addListener(new ChannelFutureListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Queue<BookkeeperInternalCallbacks.GenericCallback<Void>> oldPendingOps;
                int rc;
                PerChannelBookieClient perChannelBookieClient = PerChannelBookieClient.this;
                synchronized (perChannelBookieClient) {
                    if (future.isSuccess()) {
                        LOG.info("Successfully connected to bookie: " + PerChannelBookieClient.this.addr);
                        rc = 0;
                        PerChannelBookieClient.this.channel = future.getChannel();
                        PerChannelBookieClient.this.state = ConnectionState.CONNECTED;
                    } else {
                        LOG.error("Could not connect to bookie: " + PerChannelBookieClient.this.addr);
                        rc = -8;
                        PerChannelBookieClient.this.channel = null;
                        PerChannelBookieClient.this.state = ConnectionState.DISCONNECTED;
                    }
                    PerChannelBookieClient.this.channel = PerChannelBookieClient.this.channel;
                    oldPendingOps = PerChannelBookieClient.this.pendingOps;
                    PerChannelBookieClient.this.pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<Void>>();
                }
                for (BookkeeperInternalCallbacks.GenericCallback genericCallback : oldPendingOps) {
                    genericCallback.operationComplete(rc, null);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<Void> op) {
        boolean doOpNow;
        if (this.channel != null && this.state == ConnectionState.CONNECTED) {
            doOpNow = true;
        } else {
            PerChannelBookieClient perChannelBookieClient = this;
            synchronized (perChannelBookieClient) {
                if (this.channel != null && this.state == ConnectionState.CONNECTED) {
                    doOpNow = true;
                } else {
                    doOpNow = false;
                    this.pendingOps.add(op);
                    this.connect();
                }
            }
        }
        if (doOpNow) {
            op.operationComplete(0, null);
        }
    }

    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ChannelBuffer toSend, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, int options) {
        final int entrySize = toSend.readableBytes();
        final CompletionKey completionKey = new CompletionKey(ledgerId, entryId);
        this.addCompletions.put(completionKey, new AddCompletion(cb, entrySize, ctx));
        int totalHeaderSize = 8 + masterKey.length;
        ChannelBuffer header = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
        header.writeInt(totalHeaderSize - 4 + entrySize);
        header.writeInt(new BookieProtocol.PacketHeader(1, 1, (short)options).toInt());
        header.writeBytes(masterKey);
        ChannelBuffer wrappedBuffer = ChannelBuffers.wrappedBuffer(header, toSend);
        ChannelFuture future = this.channel.write(wrappedBuffer);
        future.addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Successfully wrote request for adding entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress() + " entry length: " + entrySize);
                    }
                } else {
                    PerChannelBookieClient.this.errorOutAddKey(completionKey);
                }
            }
        });
    }

    public void readEntry(final long ledgerId, final long entryId, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx, int options) {
        final CompletionKey key = new CompletionKey(ledgerId, entryId);
        this.readCompletions.put(key, new ReadCompletion(cb, ctx));
        int totalHeaderSize = 24;
        ChannelBuffer tmpEntry = this.channel.getConfig().getBufferFactory().getBuffer(totalHeaderSize);
        tmpEntry.writeInt(totalHeaderSize - 4);
        tmpEntry.writeInt(new BookieProtocol.PacketHeader(1, 2, (short)options).toInt());
        tmpEntry.writeLong(ledgerId);
        tmpEntry.writeLong(entryId);
        ChannelFuture future = this.channel.write(tmpEntry);
        future.addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Successfully wrote request for reading entry: " + entryId + " ledger-id: " + ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                    }
                } else {
                    PerChannelBookieClient.this.errorOutReadKey(key);
                }
            }
        });
    }

    public void close() {
        if (this.channel != null) {
            this.channel.close().awaitUninterruptibly();
        }
    }

    void errorOutReadKey(final CompletionKey key) {
        this.executor.submitOrdered(key.ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                ReadCompletion readCompletion = PerChannelBookieClient.this.readCompletions.remove(key);
                if (readCompletion != null) {
                    LOG.error("Could not write  request for reading entry: " + key.entryId + " ledger-id: " + key.ledgerId + " bookie: " + PerChannelBookieClient.this.channel.getRemoteAddress());
                    readCompletion.cb.readEntryComplete(-8, key.ledgerId, key.entryId, null, readCompletion.ctx);
                }
            }
        });
    }

    void errorOutAddKey(final CompletionKey key) {
        this.executor.submitOrdered(key.ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                AddCompletion addCompletion = PerChannelBookieClient.this.addCompletions.remove(key);
                if (addCompletion != null) {
                    String bAddress = "null";
                    if (PerChannelBookieClient.this.channel != null) {
                        bAddress = PerChannelBookieClient.this.channel.getRemoteAddress().toString();
                    }
                    LOG.error("Could not write request for adding entry: " + key.entryId + " ledger-id: " + key.ledgerId + " bookie: " + bAddress);
                    addCompletion.cb.writeComplete(-8, key.ledgerId, key.entryId, PerChannelBookieClient.this.addr, addCompletion.ctx);
                    LOG.error("Invoked callback method: " + key.entryId);
                }
            }
        });
    }

    void errorOutOutstandingEntries() {
        for (CompletionKey key : this.addCompletions.keySet()) {
            this.errorOutAddKey(key);
        }
        for (CompletionKey key : this.readCompletions.keySet()) {
            this.errorOutReadKey(key);
        }
    }

    @Override
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
        pipeline.addLast("mainhandler", this);
        return pipeline;
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        LOG.info("Disconnected from bookie: " + this.addr);
        this.errorOutOutstandingEntries();
        this.channel.close();
        this.state = ConnectionState.DISCONNECTED;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        Throwable t = e.getCause();
        if (t instanceof CorruptedFrameException || t instanceof TooLongFrameException) {
            LOG.error("Corrupted fram received from bookie: " + e.getChannel().getRemoteAddress());
            return;
        }
        if (t instanceof IOException) {
            return;
        }
        LOG.error("Unexpected exception caught by bookie client channel handler", t);
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        long entryId;
        long ledgerId;
        int rc;
        BookieProtocol.PacketHeader header;
        if (!(e.getMessage() instanceof ChannelBuffer)) {
            ctx.sendUpstream(e);
            return;
        }
        final ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
        try {
            header = BookieProtocol.PacketHeader.fromInt(buffer.readInt());
            rc = buffer.readInt();
            ledgerId = buffer.readLong();
            entryId = buffer.readLong();
        }
        catch (IndexOutOfBoundsException ex) {
            LOG.error("Unparseable response from bookie: " + this.addr, (Throwable)ex);
            return;
        }
        this.executor.submitOrdered(ledgerId, new SafeRunnable(){

            @Override
            public void safeRun() {
                switch (header.getOpCode()) {
                    case 1: {
                        PerChannelBookieClient.this.handleAddResponse(ledgerId, entryId, rc);
                        break;
                    }
                    case 2: {
                        PerChannelBookieClient.this.handleReadResponse(ledgerId, entryId, rc, buffer);
                        break;
                    }
                    default: {
                        LOG.error("Unexpected response, type: " + header.getOpCode() + " received from bookie: " + PerChannelBookieClient.this.addr + " , ignoring");
                    }
                }
            }
        });
    }

    void handleAddResponse(long ledgerId, long entryId, int rc) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for add request from bookie: " + this.addr + " for ledger: " + ledgerId + " entry: " + entryId + " rc: " + rc);
        }
        switch (rc) {
            case 0: {
                rc = 0;
                break;
            }
            case 103: {
                rc = -16;
                break;
            }
            case 104: {
                rc = -101;
                break;
            }
            default: {
                LOG.error("Add for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + this.addr + " with code: " + rc);
                rc = -12;
            }
        }
        AddCompletion ac = this.addCompletions.remove(new CompletionKey(ledgerId, entryId));
        if (ac == null) {
            LOG.error("Unexpected add response received from bookie: " + this.addr + " for ledger: " + ledgerId + ", entry: " + entryId + " , ignoring");
            return;
        }
        ac.cb.writeComplete(rc, ledgerId, entryId, this.addr, ac.ctx);
    }

    void handleReadResponse(long ledgerId, long entryId, int rc, ChannelBuffer buffer) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got response for read request from bookie: " + this.addr + " for ledger: " + ledgerId + " entry: " + entryId + " rc: " + rc + "entry length: " + buffer.readableBytes());
        }
        if (rc == 0) {
            rc = 0;
        } else if (rc == 2 || rc == 1) {
            rc = -13;
        } else if (rc == 103) {
            rc = -16;
        } else {
            LOG.error("Read for ledger: " + ledgerId + ", entry: " + entryId + " failed on bookie: " + this.addr + " with code: " + rc);
            rc = -1;
        }
        CompletionKey key = new CompletionKey(ledgerId, entryId);
        ReadCompletion readCompletion = this.readCompletions.remove(key);
        if (readCompletion == null) {
            readCompletion = this.readCompletions.remove(new CompletionKey(ledgerId, -1L));
        }
        if (readCompletion == null) {
            LOG.error("Unexpected read response received from bookie: " + this.addr + " for ledger: " + ledgerId + ", entry: " + entryId + " , ignoring");
            return;
        }
        readCompletion.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), readCompletion.ctx);
    }

    static class CompletionKey {
        long ledgerId;
        long entryId;

        CompletionKey(long ledgerId, long entryId) {
            this.ledgerId = ledgerId;
            this.entryId = entryId;
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof CompletionKey) || obj == null) {
                return false;
            }
            CompletionKey that = (CompletionKey)obj;
            return this.ledgerId == that.ledgerId && this.entryId == that.entryId;
        }

        public int hashCode() {
            return (int)this.ledgerId << 16 ^ (int)this.entryId;
        }

        public String toString() {
            return String.format("LedgerEntry(%d, %d)", this.ledgerId, this.entryId);
        }
    }

    static class AddCompletion {
        final BookkeeperInternalCallbacks.WriteCallback cb;
        final Object ctx;

        public AddCompletion(BookkeeperInternalCallbacks.WriteCallback cb, long size, Object ctx) {
            this.cb = cb;
            this.ctx = ctx;
        }
    }

    static class ReadCompletion {
        final BookkeeperInternalCallbacks.ReadEntryCallback cb;
        final Object ctx;

        public ReadCompletion(BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
            this.cb = cb;
            this.ctx = ctx;
        }
    }

    private static enum ConnectionState {
        DISCONNECTED,
        CONNECTING,
        CONNECTED;

    }
}

