/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.EOFException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.InvalidReceiveException;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Selector
implements Selectable {
    private static final Logger log = LoggerFactory.getLogger(Selector.class);
    private final java.nio.channels.Selector nioSelector;
    private final Map<String, SelectionKey> keys;
    private final List<Send> completedSends;
    private final List<NetworkReceive> completedReceives;
    private final List<String> disconnected;
    private final List<String> connected;
    private final List<String> failedSends;
    private final Time time;
    private final SelectorMetrics sensors;
    private final String metricGrpPrefix;
    private final Map<String, String> metricTags;
    private final Map<String, Long> lruConnections;
    private final long connectionsMaxIdleNanos;
    private final int maxReceiveSize;
    private final boolean metricsPerConnection;
    private long currentTimeNanos;
    private long nextIdleCloseCheckTime;

    public Selector(int maxReceiveSize, long connectionMaxIdleMs, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
        try {
            this.nioSelector = java.nio.channels.Selector.open();
        }
        catch (IOException e) {
            throw new KafkaException(e);
        }
        this.maxReceiveSize = maxReceiveSize;
        this.connectionsMaxIdleNanos = connectionMaxIdleMs * 1000L * 1000L;
        this.time = time;
        this.metricGrpPrefix = metricGrpPrefix;
        this.metricTags = metricTags;
        this.keys = new HashMap<String, SelectionKey>();
        this.completedSends = new ArrayList<Send>();
        this.completedReceives = new ArrayList<NetworkReceive>();
        this.connected = new ArrayList<String>();
        this.disconnected = new ArrayList<String>();
        this.failedSends = new ArrayList<String>();
        this.sensors = new SelectorMetrics(metrics);
        this.lruConnections = new LinkedHashMap<String, Long>(16, 0.75f, true);
        this.currentTimeNanos = new SystemTime().nanoseconds();
        this.nextIdleCloseCheckTime = this.currentTimeNanos + this.connectionsMaxIdleNanos;
        this.metricsPerConnection = metricsPerConnection;
    }

    public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, Map<String, String> metricTags) {
        this(-1, connectionMaxIdleMS, metrics, time, metricGrpPrefix, metricTags, true);
    }

    @Override
    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
        if (this.keys.containsKey(id)) {
            throw new IllegalStateException("There is already a connection for id " + id);
        }
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        socket.setKeepAlive(true);
        socket.setSendBufferSize(sendBufferSize);
        socket.setReceiveBufferSize(receiveBufferSize);
        socket.setTcpNoDelay(true);
        try {
            channel.connect(address);
        }
        catch (UnresolvedAddressException e) {
            channel.close();
            throw new IOException("Can't resolve address: " + address, e);
        }
        catch (IOException e) {
            channel.close();
            throw e;
        }
        SelectionKey key = channel.register(this.nioSelector, 8);
        key.attach(new Transmissions(id));
        this.keys.put(id, key);
    }

    public void register(String id, SocketChannel channel) throws ClosedChannelException {
        SelectionKey key = channel.register(this.nioSelector, 1);
        key.attach(new Transmissions(id));
        this.keys.put(id, key);
    }

    @Override
    public void disconnect(String id) {
        SelectionKey key = this.keys.get(id);
        if (key != null) {
            key.cancel();
        }
    }

    @Override
    public void wakeup() {
        this.nioSelector.wakeup();
    }

    @Override
    public void close() {
        LinkedList<String> connections = new LinkedList<String>(this.keys.keySet());
        for (String id : connections) {
            this.close(id);
        }
        try {
            this.nioSelector.close();
        }
        catch (IOException e) {
            log.error("Exception closing nioSelector:", (Throwable)e);
        }
    }

    @Override
    public void send(Send send) {
        SelectionKey key = this.keyForId(send.destination());
        Transmissions transmissions = this.transmissions(key);
        if (transmissions.hasSend()) {
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        }
        transmissions.send = send;
        try {
            key.interestOps(key.interestOps() | 4);
        }
        catch (CancelledKeyException e) {
            this.close(transmissions.id);
            this.failedSends.add(send.destination());
        }
    }

    @Override
    public void poll(long timeout) throws IOException {
        long endSelect;
        this.clear();
        long startSelect = this.time.nanoseconds();
        int readyKeys = this.select(timeout);
        this.currentTimeNanos = endSelect = this.time.nanoseconds();
        this.sensors.selectTime.record(endSelect - startSelect, this.time.milliseconds());
        if (readyKeys > 0) {
            Set<SelectionKey> keys = this.nioSelector.selectedKeys();
            Iterator<SelectionKey> iter = keys.iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                Transmissions transmissions = this.transmissions(key);
                SocketChannel channel = this.channel(key);
                this.sensors.maybeRegisterConnectionMetrics(transmissions.id);
                this.lruConnections.put(transmissions.id, this.currentTimeNanos);
                try {
                    if (key.isConnectable()) {
                        channel.finishConnect();
                        key.interestOps(key.interestOps() & 0xFFFFFFF7 | 1);
                        this.connected.add(transmissions.id);
                        this.sensors.connectionCreated.record();
                        log.debug("Connection {} created", (Object)transmissions.id);
                    }
                    if (key.isReadable()) {
                        if (!transmissions.hasReceive()) {
                            transmissions.receive = new NetworkReceive(this.maxReceiveSize, transmissions.id);
                        }
                        try {
                            transmissions.receive.readFrom(channel);
                        }
                        catch (InvalidReceiveException e) {
                            log.error("Invalid data received from " + transmissions.id + " closing connection", (Throwable)e);
                            this.close(transmissions.id);
                            throw e;
                        }
                        if (transmissions.receive.complete()) {
                            transmissions.receive.payload().rewind();
                            this.completedReceives.add(transmissions.receive);
                            this.sensors.recordBytesReceived(transmissions.id, transmissions.receive.payload().limit());
                            transmissions.clearReceive();
                        }
                    }
                    if (key.isWritable()) {
                        transmissions.send.writeTo(channel);
                        if (transmissions.send.completed()) {
                            this.completedSends.add(transmissions.send);
                            this.sensors.recordBytesSent(transmissions.id, transmissions.send.size());
                            transmissions.clearSend();
                            key.interestOps(key.interestOps() & 0xFFFFFFFB);
                        }
                    }
                    if (key.isValid()) continue;
                    this.close(transmissions.id);
                    this.disconnected.add(transmissions.id);
                }
                catch (IOException e) {
                    String desc = this.socketDescription(channel);
                    if (e instanceof EOFException || e instanceof ConnectException) {
                        log.debug("Connection {} disconnected", (Object)desc);
                    } else {
                        log.warn("Error in I/O with connection to {}", (Object)desc, (Object)e);
                    }
                    this.close(transmissions.id);
                    this.disconnected.add(transmissions.id);
                }
            }
        }
        long endIo = this.time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, this.time.milliseconds());
        this.maybeCloseOldestConnection();
    }

    private String socketDescription(SocketChannel channel) {
        Socket socket = channel.socket();
        if (socket == null) {
            return "[unconnected socket]";
        }
        if (socket.getInetAddress() != null) {
            return socket.getInetAddress().toString();
        }
        return socket.getLocalAddress().toString();
    }

    @Override
    public List<Send> completedSends() {
        return this.completedSends;
    }

    @Override
    public List<NetworkReceive> completedReceives() {
        return this.completedReceives;
    }

    @Override
    public List<String> disconnected() {
        return this.disconnected;
    }

    @Override
    public List<String> connected() {
        return this.connected;
    }

    @Override
    public void mute(String id) {
        this.mute(this.keyForId(id));
    }

    private void mute(SelectionKey key) {
        key.interestOps(key.interestOps() & 0xFFFFFFFE);
    }

    @Override
    public void unmute(String id) {
        this.unmute(this.keyForId(id));
    }

    private void unmute(SelectionKey key) {
        key.interestOps(key.interestOps() | 1);
    }

    @Override
    public void muteAll() {
        for (SelectionKey key : this.keys.values()) {
            this.mute(key);
        }
    }

    @Override
    public void unmuteAll() {
        for (SelectionKey key : this.keys.values()) {
            this.unmute(key);
        }
    }

    private void maybeCloseOldestConnection() {
        if (this.currentTimeNanos > this.nextIdleCloseCheckTime) {
            if (this.lruConnections.isEmpty()) {
                this.nextIdleCloseCheckTime = this.currentTimeNanos + this.connectionsMaxIdleNanos;
            } else {
                Map.Entry<String, Long> oldestConnectionEntry = this.lruConnections.entrySet().iterator().next();
                Long connectionLastActiveTime = oldestConnectionEntry.getValue();
                this.nextIdleCloseCheckTime = connectionLastActiveTime + this.connectionsMaxIdleNanos;
                if (this.currentTimeNanos > this.nextIdleCloseCheckTime) {
                    String connectionId = oldestConnectionEntry.getKey();
                    if (log.isTraceEnabled()) {
                        log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (this.currentTimeNanos - connectionLastActiveTime) / 1000L / 1000L + " millis");
                    }
                    this.disconnected.add(connectionId);
                    this.close(connectionId);
                }
            }
        }
    }

    private void clear() {
        this.completedSends.clear();
        this.completedReceives.clear();
        this.connected.clear();
        this.disconnected.clear();
        this.disconnected.addAll(this.failedSends);
        this.failedSends.clear();
    }

    private int select(long ms) throws IOException {
        if (ms == 0L) {
            return this.nioSelector.selectNow();
        }
        if (ms < 0L) {
            return this.nioSelector.select();
        }
        return this.nioSelector.select(ms);
    }

    public void close(String id) {
        SelectionKey key = this.keyForId(id);
        this.lruConnections.remove(id);
        SocketChannel channel = this.channel(key);
        Transmissions trans = this.transmissions(key);
        if (trans != null) {
            this.keys.remove(trans.id);
            trans.clearReceive();
            trans.clearSend();
        }
        key.attach(null);
        key.cancel();
        try {
            channel.socket().close();
            channel.close();
        }
        catch (IOException e) {
            log.error("Exception closing connection to node {}:", (Object)trans.id, (Object)e);
        }
        this.sensors.connectionClosed.record();
    }

    private SelectionKey keyForId(String id) {
        SelectionKey key = this.keys.get(id);
        if (key == null) {
            throw new IllegalStateException("Attempt to write to socket for which there is no open connection. Connection id " + id + " existing connections " + this.keys.keySet().toString());
        }
        return key;
    }

    private Transmissions transmissions(SelectionKey key) {
        return (Transmissions)key.attachment();
    }

    private SocketChannel channel(SelectionKey key) {
        return (SocketChannel)key.channel();
    }

    private class SelectorMetrics {
        private final Metrics metrics;
        public final Sensor connectionClosed;
        public final Sensor connectionCreated;
        public final Sensor bytesTransferred;
        public final Sensor bytesSent;
        public final Sensor bytesReceived;
        public final Sensor selectTime;
        public final Sensor ioTime;

        public SelectorMetrics(Metrics metrics) {
            this.metrics = metrics;
            String metricGrpName = Selector.this.metricGrpPrefix + "-metrics";
            StringBuilder tagsSuffix = new StringBuilder();
            for (Map.Entry tag : Selector.this.metricTags.entrySet()) {
                tagsSuffix.append((String)tag.getKey());
                tagsSuffix.append("-");
                tagsSuffix.append((String)tag.getValue());
            }
            this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString());
            MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", Selector.this.metricTags);
            this.connectionClosed.add(metricName, new Rate());
            this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString());
            metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", Selector.this.metricTags);
            this.connectionCreated.add(metricName, new Rate());
            this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString());
            metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", Selector.this.metricTags);
            this.bytesTransferred.add(metricName, new Rate(new Count()));
            this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), this.bytesTransferred);
            metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate());
            metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Rate(new Count()));
            metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Avg());
            metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", Selector.this.metricTags);
            this.bytesSent.add(metricName, new Max());
            this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), this.bytesTransferred);
            metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate());
            metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", Selector.this.metricTags);
            this.bytesReceived.add(metricName, new Rate(new Count()));
            this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString());
            metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(new Count()));
            metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Avg());
            metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", Selector.this.metricTags);
            this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString());
            metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", Selector.this.metricTags);
            this.ioTime.add(metricName, new Avg());
            metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", Selector.this.metricTags);
            this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS));
            metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", Selector.this.metricTags);
            this.metrics.addMetric(metricName, new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return Selector.this.keys.size();
                }
            });
        }

        public void maybeRegisterConnectionMetrics(String connectionId) {
            String nodeRequestName;
            Sensor nodeRequest;
            if (!connectionId.isEmpty() && Selector.this.metricsPerConnection && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) == null) {
                String metricGrpName = Selector.this.metricGrpPrefix + "-node-metrics";
                LinkedHashMap<String, String> tags = new LinkedHashMap<String, String>(Selector.this.metricTags);
                tags.put("node-id", "node-" + connectionId);
                nodeRequest = this.metrics.sensor(nodeRequestName);
                MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags);
                nodeRequest.add(metricName, new Rate());
                metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags);
                nodeRequest.add(metricName, new Rate(new Count()));
                metricName = new MetricName("request-size-avg", metricGrpName, "The average size of all requests in the window..", tags);
                nodeRequest.add(metricName, new Avg());
                metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", tags);
                nodeRequest.add(metricName, new Max());
                String nodeResponseName = "node-" + connectionId + ".bytes-received";
                Sensor nodeResponse = this.metrics.sensor(nodeResponseName);
                metricName = new MetricName("incoming-byte-rate", metricGrpName, tags);
                nodeResponse.add(metricName, new Rate());
                metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags);
                nodeResponse.add(metricName, new Rate(new Count()));
                String nodeTimeName = "node-" + connectionId + ".latency";
                Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName);
                metricName = new MetricName("request-latency-avg", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Avg());
                metricName = new MetricName("request-latency-max", metricGrpName, tags);
                nodeRequestTime.add(metricName, new Max());
            }
        }

        public void recordBytesSent(String connectionId, long bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesSent.record(bytes, now);
            if (!connectionId.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connectionId + ".bytes-sent")) != null) {
                nodeRequest.record(bytes, now);
            }
        }

        public void recordBytesReceived(String connection, int bytes) {
            String nodeRequestName;
            Sensor nodeRequest;
            long now = Selector.this.time.milliseconds();
            this.bytesReceived.record(bytes, now);
            if (!connection.isEmpty() && (nodeRequest = this.metrics.getSensor(nodeRequestName = "node-" + connection + ".bytes-received")) != null) {
                nodeRequest.record(bytes, now);
            }
        }
    }

    private static class Transmissions {
        public String id;
        public Send send;
        public NetworkReceive receive;

        public Transmissions(String id) {
            this.id = id;
        }

        public boolean hasSend() {
            return this.send != null;
        }

        public void clearSend() {
            this.send = null;
        }

        public boolean hasReceive() {
            return this.receive != null;
        }

        public void clearReceive() {
            this.receive = null;
        }
    }
}

