package backtype.storm.messaging.netty;

import backtype.storm.Config;
import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.messaging.TaskMessage;
import backtype.storm.metric.api.IStatefulObject;
import backtype.storm.utils.Utils;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.storm.shade.org.jboss.netty.bootstrap.ServerBootstrap;
import org.apache.storm.shade.org.jboss.netty.channel.Channel;
import org.apache.storm.shade.org.jboss.netty.channel.ChannelFactory;
import org.apache.storm.shade.org.jboss.netty.channel.group.ChannelGroup;
import org.apache.storm.shade.org.jboss.netty.channel.group.DefaultChannelGroup;
import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:backtype/storm/messaging/netty/Server.class */
public class Server extends ConnectionWithStatus implements IStatefulObject {
    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
    Map storm_conf;
    int port;
    private final AtomicInteger[] pendingMessages;
    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
    final ChannelFactory factory;
    final ServerBootstrap bootstrap;
    private int queueCount;
    private volatile HashMap<Integer, Integer> taskToQueueId;
    private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
    private final AtomicInteger messagesDequeued = new AtomicInteger(0);
    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
    private volatile boolean closing = false;
    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
    int roundRobinQueueId = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Server(Map map, int i) {
        this.taskToQueueId = null;
        this.storm_conf = map;
        this.port = i;
        this.queueCount = Utils.getInt(map.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1).intValue();
        this.taskToQueueId = new HashMap<>();
        this.message_queue = new LinkedBlockingQueue[this.queueCount];
        this.pendingMessages = new AtomicInteger[this.queueCount];
        for (int i2 = 0; i2 < this.queueCount; i2++) {
            this.message_queue[i2] = new LinkedBlockingQueue<>();
            this.pendingMessages[i2] = new AtomicInteger(0);
        }
        int intValue = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)).intValue();
        int intValue2 = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_SOCKET_BACKLOG), 500).intValue();
        int intValue3 = Utils.getInt(map.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS)).intValue();
        NettyRenameThreadFactory nettyRenameThreadFactory = new NettyRenameThreadFactory(name() + "-boss");
        NettyRenameThreadFactory nettyRenameThreadFactory2 = new NettyRenameThreadFactory(name() + "-worker");
        if (intValue3 > 0) {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2), intValue3);
        } else {
            this.factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(nettyRenameThreadFactory), Executors.newCachedThreadPool(nettyRenameThreadFactory2));
        }
        LOG.info("Create Netty Server " + name() + ", buffer_size: " + intValue + ", maxWorkers: " + intValue3);
        this.bootstrap = new ServerBootstrap(this.factory);
        this.bootstrap.setOption("child.tcpNoDelay", true);
        this.bootstrap.setOption("child.receiveBufferSize", Integer.valueOf(intValue));
        this.bootstrap.setOption("child.keepAlive", true);
        this.bootstrap.setOption("backlog", Integer.valueOf(intValue2));
        this.bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
        this.allChannels.add(this.bootstrap.bind(new InetSocketAddress(i)));
    }

    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> list) {
        ArrayList<TaskMessage>[] arrayListArr = new ArrayList[this.queueCount];
        for (int i = 0; i < list.size(); i++) {
            TaskMessage taskMessage = list.get(i);
            int task = taskMessage.task();
            if (task == -1) {
                this.closing = true;
                return null;
            }
            Integer messageQueueId = getMessageQueueId(task);
            if (null == arrayListArr[messageQueueId.intValue()]) {
                arrayListArr[messageQueueId.intValue()] = new ArrayList<>();
            }
            arrayListArr[messageQueueId.intValue()].add(taskMessage);
        }
        return arrayListArr;
    }

    private Integer getMessageQueueId(int i) {
        Integer num = this.taskToQueueId.get(Integer.valueOf(i));
        if (null == num) {
            synchronized (this) {
                num = this.taskToQueueId.get(Integer.valueOf(i));
                if (num == null) {
                    int i2 = this.roundRobinQueueId;
                    this.roundRobinQueueId = i2 + 1;
                    num = Integer.valueOf(i2);
                    if (this.roundRobinQueueId == this.queueCount) {
                        this.roundRobinQueueId = 0;
                    }
                    HashMap<Integer, Integer> hashMap = new HashMap<>(this.taskToQueueId);
                    hashMap.put(Integer.valueOf(i), num);
                    this.taskToQueueId = hashMap;
                }
            }
        }
        return num;
    }

    private void addReceiveCount(String str, int i) {
        AtomicInteger atomicInteger = this.messagesEnqueued.get(str);
        if (atomicInteger != null) {
            atomicInteger.addAndGet(i);
            return;
        }
        AtomicInteger putIfAbsent = this.messagesEnqueued.putIfAbsent(str, new AtomicInteger(i));
        if (putIfAbsent != null) {
            putIfAbsent.addAndGet(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void enqueue(List<TaskMessage> list, String str) throws InterruptedException {
        if (null == list || list.size() == 0 || this.closing) {
            return;
        }
        addReceiveCount(str, list.size());
        ArrayList<TaskMessage>[] groupMessages = groupMessages(list);
        if (null == groupMessages || this.closing) {
            return;
        }
        for (int i = 0; i < groupMessages.length; i++) {
            ArrayList<TaskMessage> arrayList = groupMessages[i];
            if (null != arrayList) {
                this.message_queue[i].put(arrayList);
                this.pendingMessages[i].addAndGet(arrayList.size());
            }
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public Iterator<TaskMessage> recv(int i, int i2) {
        ArrayList<TaskMessage> arrayList;
        if (this.closing) {
            return this.closeMessage.iterator();
        }
        int i3 = i2 % this.queueCount;
        if ((i & 1) == 1) {
            arrayList = this.message_queue[i3].poll();
        } else {
            try {
                ArrayList<TaskMessage> take = this.message_queue[i3].take();
                LOG.debug("request to be processed: {}", take);
                arrayList = take;
            } catch (InterruptedException e) {
                LOG.info("exception within msg receiving", e);
                arrayList = null;
            }
        }
        if (null == arrayList) {
            return null;
        }
        this.messagesDequeued.addAndGet(arrayList.size());
        this.pendingMessages[i3].addAndGet(0 - arrayList.size());
        return arrayList.iterator();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addChannel(Channel channel) {
        this.allChannels.add(channel);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeChannel(Channel channel) {
        channel.close().awaitUninterruptibly();
        this.allChannels.remove(channel);
    }

    @Override // backtype.storm.messaging.IConnection
    public synchronized void close() {
        if (this.allChannels != null) {
            this.allChannels.close().awaitUninterruptibly();
            this.factory.releaseExternalResources();
            this.allChannels = null;
        }
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(int i, byte[] bArr) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    @Override // backtype.storm.messaging.IConnection
    public void send(Iterator<TaskMessage> it) {
        throw new UnsupportedOperationException("Server connection should not send any messages");
    }

    public String name() {
        return "Netty-server-localhost-" + this.port;
    }

    @Override // backtype.storm.messaging.ConnectionWithStatus
    public ConnectionWithStatus.Status status() {
        return this.closing ? ConnectionWithStatus.Status.Closed : !connectionEstablished(this.allChannels) ? ConnectionWithStatus.Status.Connecting : ConnectionWithStatus.Status.Ready;
    }

    private boolean connectionEstablished(Channel channel) {
        return channel != null && channel.isBound();
    }

    private boolean connectionEstablished(ChannelGroup channelGroup) {
        boolean z = true;
        Iterator<Channel> it = channelGroup.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (!connectionEstablished(it.next())) {
                z = false;
                break;
            }
        }
        return z;
    }

    @Override // backtype.storm.metric.api.IStatefulObject
    public Object getState() {
        LOG.info("Getting metrics for server on port {}", Integer.valueOf(this.port));
        HashMap hashMap = new HashMap();
        hashMap.put("dequeuedMessages", Integer.valueOf(this.messagesDequeued.getAndSet(0)));
        ArrayList arrayList = new ArrayList(this.pendingMessages.length);
        for (AtomicInteger atomicInteger : this.pendingMessages) {
            arrayList.add(Integer.valueOf(atomicInteger.get()));
        }
        hashMap.put("pending", arrayList);
        HashMap hashMap2 = new HashMap();
        Iterator<Map.Entry<String, AtomicInteger>> it = this.messagesEnqueued.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, AtomicInteger> next = it.next();
            AtomicInteger value = next.getValue();
            if (value.get() == 0) {
                it.remove();
            } else {
                hashMap2.put(next.getKey(), Integer.valueOf(value.getAndSet(0)));
            }
        }
        hashMap.put("enqueued", hashMap2);
        return hashMap;
    }

    public String toString() {
        return String.format("Netty server listening on port %s", Integer.valueOf(this.port));
    }
}
