package backtype.storm.messaging.netty;

import backtype.storm.Config;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
import backtype.storm.utils.Utils;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.storm.netty.bootstrap.ClientBootstrap;
import org.apache.storm.netty.channel.Channel;
import org.apache.storm.netty.channel.ChannelFactory;
import org.apache.storm.netty.channel.ChannelFuture;
import org.apache.storm.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:backtype/storm/messaging/netty/Client.class */
public class Client implements IConnection {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private static final String PREFIX = "Netty-Client-";
    private final int max_retries;
    private final int base_sleep_ms;
    private final int max_sleep_ms;
    private final StormBoundedExponentialBackoffRetry retryPolicy;
    private final ClientBootstrap bootstrap;
    private InetSocketAddress remote_addr;
    private final ChannelFactory factory;
    private final int buffer_size;
    private int messageBatchSize;
    private int flushCheckInterval;
    private ScheduledExecutorService scheduler;
    private final Random random = new Random();
    MessageBatch messageBatch = null;
    private AtomicReference<Channel> channelRef = new AtomicReference<>(null);
    private boolean closing = false;
    private AtomicLong pendings = new AtomicLong(0);
    private AtomicLong flushCheckTimer = new AtomicLong(Long.MAX_VALUE);

    /* JADX INFO: Access modifiers changed from: package-private */
    public Client(Map map, ChannelFactory channelFactory, ScheduledExecutorService scheduledExecutorService, String str, int i) {
        this.factory = channelFactory;
        this.scheduler = scheduledExecutorService;
        this.buffer_size = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        this.max_retries = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)).intValue();
        this.base_sleep_ms = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)).intValue();
        this.max_sleep_ms = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)).intValue();
        this.retryPolicy = new StormBoundedExponentialBackoffRetry(this.base_sleep_ms, this.max_sleep_ms, this.max_retries);
        this.messageBatchSize = Utils.getInt(map.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144).intValue();
        this.flushCheckInterval = Utils.getInt(map.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10).intValue();
        LOG.info("New Netty Client, connect to " + str + ", " + i + ", config: , buffer_size: " + this.buffer_size);
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("sendBufferSize", Integer.valueOf(this.buffer_size));
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
        this.remote_addr = new InetSocketAddress(str, i);
        scheduledExecutorService.execute(new Runnable() { // from class: backtype.storm.messaging.netty.Client.1
            @Override // java.lang.Runnable
            public void run() {
                Client.this.connect();
            }
        });
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { // from class: backtype.storm.messaging.netty.Client.2
            @Override // java.lang.Runnable
            public void run() {
                Channel channel;
                if (Client.this.closing) {
                    return;
                }
                if (System.currentTimeMillis() <= Client.this.flushCheckTimer.get() || null == (channel = (Channel) Client.this.channelRef.get()) || !channel.isWritable()) {
                    return;
                }
                Client.this.flush(channel);
            }
        }, Math.min(30000L, this.max_sleep_ms * this.max_retries), this.flushCheckInterval, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void connect() {
        try {
            Channel channel = this.channelRef.get();
            if (channel == null || !channel.isConnected()) {
                int i = 0;
                Channel channel2 = null;
                while (true) {
                    if (i > this.max_retries) {
                        break;
                    }
                    LOG.info("Reconnect started for {}... [{}]", name(), Integer.valueOf(i));
                    LOG.debug("connection started...");
                    ChannelFuture connect = this.bootstrap.connect(this.remote_addr);
                    connect.awaitUninterruptibly();
                    Channel channel3 = connect.getChannel();
                    if (connect.isSuccess()) {
                        channel2 = channel3;
                        break;
                    }
                    if (null != channel3) {
                        channel3.close();
                    }
                    Thread.sleep(this.retryPolicy.getSleepTimeMs(i, 0L));
                    i++;
                }
                if (null == channel2) {
                    close();
                    throw new RuntimeException("Remote address is not reachable. We will close this client " + name());
                }
                LOG.info("connection established to a remote host " + name() + ", " + channel2.toString());
                this.channelRef.set(channel2);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException("connection failed " + name(), e);
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public synchronized void send(Iterator<TaskMessage> it) {
        if (this.closing) {
            throw new RuntimeException("Client is being closed, and does not take requests any more");
        }
        if (null == it || !it.hasNext()) {
            return;
        }
        Channel channel = this.channelRef.get();
        if (null == channel) {
            connect();
            channel = this.channelRef.get();
        }
        while (it.hasNext()) {
            if (!channel.isConnected()) {
                connect();
                channel = this.channelRef.get();
            }
            TaskMessage next = it.next();
            if (null == this.messageBatch) {
                this.messageBatch = new MessageBatch(this.messageBatchSize);
            }
            this.messageBatch.add(next);
            if (this.messageBatch.isFull()) {
                flushRequest(channel, this.messageBatch);
                this.messageBatch = null;
            }
        }
        if (null == this.messageBatch || this.messageBatch.isEmpty()) {
            return;
        }
        if (!channel.isWritable()) {
            this.flushCheckTimer.set(System.currentTimeMillis() + this.flushCheckInterval);
            return;
        }
        this.flushCheckTimer.set(Long.MAX_VALUE);
        MessageBatch messageBatch = this.messageBatch;
        this.messageBatch = null;
        flushRequest(channel, messageBatch);
    }

    public String name() {
        return null != this.remote_addr ? PREFIX + this.remote_addr.toString() : "";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void flush(Channel channel) {
        if (this.closing || null == this.messageBatch || this.messageBatch.isEmpty()) {
            return;
        }
        MessageBatch messageBatch = this.messageBatch;
        this.flushCheckTimer.set(Long.MAX_VALUE);
        flushRequest(channel, messageBatch);
        this.messageBatch = null;
    }

    /* JADX WARN: Code restructure failed: missing block: B:19:0x00b7, code lost:
    
        backtype.storm.messaging.netty.Client.LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), java.lang.Long.valueOf(r6.pendings.get()));
     */
    @Override // backtype.storm.messaging.IConnection
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public synchronized void close() {
        /*
            r6 = this;
            r0 = r6
            boolean r0 = r0.closing
            if (r0 != 0) goto Le5
            r0 = r6
            r1 = 1
            r0.closing = r1
            org.slf4j.Logger r0 = backtype.storm.messaging.netty.Client.LOG
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Closing Netty Client "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r6
            java.lang.String r2 = r2.name()
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r0.info(r1)
            r0 = 0
            r1 = r6
            backtype.storm.messaging.netty.MessageBatch r1 = r1.messageBatch
            if (r0 == r1) goto L5c
            r0 = r6
            backtype.storm.messaging.netty.MessageBatch r0 = r0.messageBatch
            boolean r0 = r0.isEmpty()
            if (r0 != 0) goto L5c
            r0 = r6
            backtype.storm.messaging.netty.MessageBatch r0 = r0.messageBatch
            r7 = r0
            r0 = r6
            java.util.concurrent.atomic.AtomicReference<org.apache.storm.netty.channel.Channel> r0 = r0.channelRef
            java.lang.Object r0 = r0.get()
            org.apache.storm.netty.channel.Channel r0 = (org.apache.storm.netty.channel.Channel) r0
            r8 = r0
            r0 = r8
            if (r0 == 0) goto L57
            r0 = r6
            r1 = r8
            r2 = r7
            r0.flushRequest(r1, r2)
        L57:
            r0 = r6
            r1 = 0
            r0.messageBatch = r1
        L5c:
            r0 = 600000(0x927c0, double:2.964394E-318)
            r9 = r0
            long r0 = java.lang.System.currentTimeMillis()
            r11 = r0
            org.slf4j.Logger r0 = backtype.storm.messaging.netty.Client.LOG
            java.lang.StringBuilder r1 = new java.lang.StringBuilder
            r2 = r1
            r2.<init>()
            java.lang.String r2 = "Waiting for pending batchs to be sent with "
            java.lang.StringBuilder r1 = r1.append(r2)
            r2 = r6
            java.lang.String r2 = r2.name()
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r2 = "..., timeout: {}ms, pendings: {}"
            java.lang.StringBuilder r1 = r1.append(r2)
            java.lang.String r1 = r1.toString()
            r2 = 600000(0x927c0, double:2.964394E-318)
            java.lang.Long r2 = java.lang.Long.valueOf(r2)
            r3 = r6
            java.util.concurrent.atomic.AtomicLong r3 = r3.pendings
            long r3 = r3.get()
            java.lang.Long r3 = java.lang.Long.valueOf(r3)
            r0.info(r1, r2, r3)
        L9a:
            r0 = r6
            java.util.concurrent.atomic.AtomicLong r0 = r0.pendings
            long r0 = r0.get()
            r1 = 0
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 == 0) goto Le1
            long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.InterruptedException -> Ldc
            r1 = r11
            long r0 = r0 - r1
            r13 = r0
            r0 = r13
            r1 = 600000(0x927c0, double:2.964394E-318)
            int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
            if (r0 <= 0) goto Ld3
            org.slf4j.Logger r0 = backtype.storm.messaging.netty.Client.LOG     // Catch: java.lang.InterruptedException -> Ldc
            java.lang.String r1 = "Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent"
            r2 = r6
            java.lang.String r2 = r2.name()     // Catch: java.lang.InterruptedException -> Ldc
            r3 = r6
            java.util.concurrent.atomic.AtomicLong r3 = r3.pendings     // Catch: java.lang.InterruptedException -> Ldc
            long r3 = r3.get()     // Catch: java.lang.InterruptedException -> Ldc
            java.lang.Long r3 = java.lang.Long.valueOf(r3)     // Catch: java.lang.InterruptedException -> Ldc
            r0.error(r1, r2, r3)     // Catch: java.lang.InterruptedException -> Ldc
            goto Le1
        Ld3:
            r0 = 1000(0x3e8, double:4.94E-321)
            java.lang.Thread.sleep(r0)     // Catch: java.lang.InterruptedException -> Ldc
            goto L9a
        Ldc:
            r15 = move-exception
            goto Le1
        Le1:
            r0 = r6
            r0.close_n_release()
        Le5:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: backtype.storm.messaging.netty.Client.close():void");
    }

    private void close_n_release() {
        if (this.channelRef.get() != null) {
            this.channelRef.get().close();
            LOG.debug("channel {} closed", this.remote_addr);
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public Iterator<TaskMessage> recv(int i, int i2) {
        throw new RuntimeException("Client connection should not receive any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(int i, byte[] bArr) {
        TaskMessage taskMessage = new TaskMessage(i, bArr);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(taskMessage);
        send(arrayList.iterator());
    }

    private void flushRequest(Channel channel, final MessageBatch messageBatch) {
        if (messageBatch == null) {
            return;
        }
        this.pendings.incrementAndGet();
        channel.write(messageBatch).addListener(new ChannelFutureListener() { // from class: backtype.storm.messaging.netty.Client.3
            @Override // org.apache.storm.netty.channel.ChannelFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Client.this.pendings.decrementAndGet();
                if (channelFuture.isSuccess()) {
                    Client.LOG.debug("{} request(s) sent", Integer.valueOf(messageBatch.size()));
                    return;
                }
                Client.LOG.info("failed to send requests to " + Client.this.remote_addr.toString() + ": ", channelFuture.getCause());
                Channel channel2 = channelFuture.getChannel();
                if (null != channel2) {
                    channel2.close();
                    Client.this.channelRef.compareAndSet(channel2, null);
                }
            }
        });
    }
}
