/*
 * Decompiled with CFR 0.152.
 */
package com.mapr.client.impl.rpc;

import com.google.common.collect.ImmutableList;
import com.mapr.client.Config;
import com.mapr.client.impl.annotations.Owned;
import com.mapr.client.impl.annotations.Shared;
import com.mapr.client.impl.annotations.SynchronizeOn;
import com.mapr.client.impl.rpc.HostAndPort;
import com.mapr.client.impl.rpc.NettyHelper;
import com.mapr.client.impl.rpc.RpcBinding;
import com.mapr.client.impl.rpc.RpcMessage;
import com.mapr.client.impl.rpc.RpcMsgDecoder;
import com.mapr.client.impl.rpc.RpcMsgEncoder;
import com.mapr.client.impl.rpc.RpcMsgLengthDecoder;
import com.mapr.client.impl.rpc.RpcResponse;
import com.mapr.client.impl.rpc.ServiceHosts;
import com.mapr.client.impl.util.Cleanup;
import com.stumbleupon.async.Deferred;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.concurrent.GenericFutureListener;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcConnection
extends MessageToMessageDecoder<RpcResponse> {
    private static final Logger logger = LoggerFactory.getLogger(RpcConnection.class);
    private final int myConnId;
    private final AtomicLong sequence;
    @SynchronizeOn(value="outstandingRPCs")
    private final Long2ObjectOpenHashMap<RpcMessage> outstandingRPCs = new Long2ObjectOpenHashMap();
    @Shared
    protected final RpcBinding binding;
    @Owned
    private final ChannelInitializer<SocketChannel> channelInitializer;
    @Owned
    private List<Channel> channels;
    private int connectionTimeout;

    public RpcConnection(final RpcBinding binding) {
        this.binding = binding;
        this.myConnId = binding.nextCallId();
        this.sequence = new AtomicLong(0L);
        this.connectionTimeout = Config.getRpcConnectionTimeoutMs();
        this.channelInitializer = new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) throws Exception {
                logger.debug("Client SocketChannel {} initialized", (Object)ch);
                ChannelPipeline pipe = ch.pipeline();
                pipe.addLast("rpc-length-decoder", (ChannelHandler)new RpcMsgLengthDecoder());
                pipe.addLast("rpc-decoder", RpcMsgDecoder.get(binding.isSecure()));
                pipe.addLast("msg-decoder", (ChannelHandler)RpcConnection.this);
                pipe.addLast("rpc-encoder", RpcMsgEncoder.get(binding.isSecure()));
            }
        };
    }

    public void connect(ServiceHosts serviceHosts) {
        ImmutableList.Builder channelList = ImmutableList.builder();
        for (HostAndPort serviceHost : serviceHosts) {
            try {
                Bootstrap bootstrap = NettyHelper.newClientBootstrap(this.binding.getClientEventLoopGroup(), this.channelInitializer);
                logger.debug("Connecting to {}:{}", (Object)serviceHost.resolvedHost(), (Object)serviceHost.getPort());
                ChannelFuture f = ((Bootstrap)bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.connectionTimeout)).connect(serviceHost.resolvedHost(), serviceHost.getPort()).awaitUninterruptibly();
                if (f.isSuccess()) {
                    channelList.add((Object)f.channel());
                    continue;
                }
                if (f.isCancelled()) {
                    logger.debug("Connection to {}:{} was cancelled", (Object)serviceHost.resolvedHost(), (Object)serviceHost.getPort());
                    continue;
                }
                logger.warn(f.cause().getMessage(), f.cause());
            }
            catch (Exception e) {
                logger.warn(e.getMessage(), (Throwable)e);
                throw new RuntimeException(e);
            }
        }
        this.channels = channelList.build();
    }

    public void close() {
        if (this.channels != null) {
            for (Channel channel : this.channels) {
                Cleanup.Cleaner.close(channel);
            }
        }
    }

    public List<Channel> getChannels() {
        return this.channels;
    }

    public long nextSequenceId() {
        return this.sequence.incrementAndGet();
    }

    public int getConnId() {
        return this.myConnId;
    }

    public Deferred<Object> send(RpcMessage msg) {
        ChannelFuture channelFuture = this.getAvailableChannel().writeAndFlush((Object)msg);
        this.addToInflightRpc(msg);
        channelFuture.addListener((GenericFutureListener)ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
        return msg.getDeferred();
    }

    protected void decode(ChannelHandlerContext ctx, RpcResponse response, List<Object> out) throws Exception {
        long sequenceId = response.getSequenceId();
        RpcMessage reqMsg = this.getAndRemoveRpc(sequenceId);
        if (reqMsg == null) {
            throw new IllegalStateException("Rcvd a response without an outstanding request: " + response);
        }
        Object result = reqMsg.parseResponse(response);
        response.release();
        reqMsg.callback(result);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        logger.error(cause.getMessage(), cause);
    }

    private Channel getAvailableChannel() {
        return this.channels.get(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void addToInflightRpc(RpcMessage msg) {
        Long2ObjectOpenHashMap<RpcMessage> long2ObjectOpenHashMap = this.outstandingRPCs;
        synchronized (long2ObjectOpenHashMap) {
            this.outstandingRPCs.put(msg.getSequenceId(), (Object)msg);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RpcMessage getAndRemoveRpc(long sequenceId) {
        Long2ObjectOpenHashMap<RpcMessage> long2ObjectOpenHashMap = this.outstandingRPCs;
        synchronized (long2ObjectOpenHashMap) {
            return (RpcMessage)this.outstandingRPCs.remove(sequenceId);
        }
    }
}

