/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gettcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractSocketHandler {
    final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final ByteBuffer readingBuffer;
    private final Runnable listenerTask;
    private volatile ExecutorService listenerTaskExecutor;
    final InetSocketAddress address;
    volatile NetworkChannel rootChannel;
    volatile Selector selector;
    private final AtomicBoolean isRunning;
    protected final byte endOfMessageByte;

    public AbstractSocketHandler(InetSocketAddress address, int readingBufferSize, byte endOfMessageByte) {
        this.address = address;
        this.listenerTask = new ListenerTask();
        this.readingBuffer = ByteBuffer.allocate(readingBufferSize);
        this.isRunning = new AtomicBoolean();
        this.endOfMessageByte = endOfMessageByte;
    }

    public void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            try {
                if (this.selector == null || !this.selector.isOpen()) {
                    this.selector = Selector.open();
                    InetSocketAddress connectedAddress = this.connect();
                    this.listenerTaskExecutor = Executors.newCachedThreadPool();
                    this.listenerTaskExecutor.execute(this.listenerTask);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Started listener for " + this.getClass().getSimpleName());
                    }
                    if (this.logger.isInfoEnabled()) {
                        this.logger.info("Successfully bound to " + connectedAddress);
                    }
                }
            }
            catch (Exception e) {
                this.stop();
                throw new IllegalStateException("Failed to start " + this.getClass().getName(), e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        block10: {
            if (this.isRunning.compareAndSet(true, false)) {
                try {
                    if (this.selector == null || !this.selector.isOpen()) break block10;
                    HashSet<SelectionKey> selectionKeys = new HashSet<SelectionKey>(this.selector.keys());
                    for (SelectionKey key : selectionKeys) {
                        key.cancel();
                        try {
                            key.channel().close();
                        }
                        catch (IOException e) {
                            this.logger.warn("Failure while closing channel", (Throwable)e);
                        }
                    }
                    try {
                        this.selector.close();
                    }
                    catch (Exception e) {
                        this.logger.warn("Failure while closinig selector", (Throwable)e);
                    }
                    this.logger.info(this.getClass().getSimpleName() + " is stopped listening on " + this.address);
                }
                finally {
                    if (this.listenerTaskExecutor != null) {
                        this.listenerTaskExecutor.shutdown();
                    }
                }
            }
        }
    }

    public boolean isRunning() {
        return this.isRunning.get();
    }

    abstract InetSocketAddress connect() throws Exception;

    abstract void processData(SelectionKey var1, ByteBuffer var2) throws IOException;

    void doAccept(SelectionKey selectionKey) throws IOException {
    }

    private class ListenerTask
    implements Runnable {
        private ListenerTask() {
        }

        @Override
        public void run() {
            try {
                while (AbstractSocketHandler.this.rootChannel != null && AbstractSocketHandler.this.rootChannel.isOpen() && AbstractSocketHandler.this.selector.isOpen()) {
                    if (!AbstractSocketHandler.this.selector.isOpen() || AbstractSocketHandler.this.selector.select(10L) <= 0) continue;
                    Iterator<SelectionKey> keys = AbstractSocketHandler.this.selector.selectedKeys().iterator();
                    while (keys.hasNext()) {
                        SelectionKey selectionKey = keys.next();
                        keys.remove();
                        if (!selectionKey.isValid()) continue;
                        if (selectionKey.isAcceptable()) {
                            this.accept(selectionKey);
                            continue;
                        }
                        if (selectionKey.isReadable()) {
                            this.read(selectionKey);
                            continue;
                        }
                        if (!selectionKey.isConnectable()) continue;
                        this.connect(selectionKey);
                    }
                }
            }
            catch (Exception e) {
                AbstractSocketHandler.this.logger.error("Exception in socket listener loop", (Throwable)e);
            }
            AbstractSocketHandler.this.logger.debug("Exited Listener loop.");
            AbstractSocketHandler.this.stop();
        }

        private void accept(SelectionKey selectionKey) throws IOException {
            AbstractSocketHandler.this.doAccept(selectionKey);
        }

        private void connect(SelectionKey selectionKey) throws IOException {
            SocketChannel clientChannel = (SocketChannel)selectionKey.channel();
            if (clientChannel.isConnectionPending()) {
                clientChannel.finishConnect();
            }
            clientChannel.register(AbstractSocketHandler.this.selector, 1);
        }

        private void read(SelectionKey selectionKey) throws IOException {
            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
            int count = -1;
            boolean finished = false;
            while (!finished && (count = socketChannel.read(AbstractSocketHandler.this.readingBuffer)) > 0) {
                byte lastByte = AbstractSocketHandler.this.readingBuffer.get(AbstractSocketHandler.this.readingBuffer.position() - 1);
                if (AbstractSocketHandler.this.readingBuffer.remaining() != 0 && lastByte != AbstractSocketHandler.this.endOfMessageByte) continue;
                this.processBuffer(selectionKey);
                if (lastByte != AbstractSocketHandler.this.endOfMessageByte) continue;
                finished = true;
            }
            if (count == -1) {
                if (AbstractSocketHandler.this.readingBuffer.position() > 0) {
                    this.processBuffer(selectionKey);
                }
                selectionKey.cancel();
                socketChannel.close();
                if (AbstractSocketHandler.this.logger.isInfoEnabled()) {
                    AbstractSocketHandler.this.logger.info("Connection closed by: " + socketChannel.socket());
                }
            }
        }

        private void processBuffer(SelectionKey selectionKey) throws IOException {
            AbstractSocketHandler.this.readingBuffer.flip();
            byte[] message = new byte[AbstractSocketHandler.this.readingBuffer.limit()];
            AbstractSocketHandler.this.readingBuffer.get(message);
            AbstractSocketHandler.this.processData(selectionKey, ByteBuffer.wrap(message));
            AbstractSocketHandler.this.readingBuffer.clear();
        }
    }
}

