package org.apache.nifi.distributed.cache.server;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/distributed/cache/server/AbstractCacheServer.class */
public abstract class AbstractCacheServer implements CacheServer {
    private static final Logger logger = LoggerFactory.getLogger(AbstractCacheServer.class);
    private final String identifier;
    private final int port;
    private final int maxReadSize;
    private final SSLContext sslContext;
    protected volatile boolean stopped = false;
    private final Set<Thread> processInputThreads = new CopyOnWriteArraySet();
    private volatile ServerSocketChannel serverSocketChannel;

    public AbstractCacheServer(String str, SSLContext sSLContext, int i, int i2) {
        this.identifier = str;
        this.port = i;
        this.sslContext = sSLContext;
        this.maxReadSize = i2;
    }

    @Override // org.apache.nifi.distributed.cache.server.CacheServer
    public int getPort() {
        return this.serverSocketChannel == null ? this.port : this.serverSocketChannel.socket().getLocalPort();
    }

    @Override // org.apache.nifi.distributed.cache.server.CacheServer
    public void start() throws IOException {
        this.serverSocketChannel = ServerSocketChannel.open();
        this.serverSocketChannel.configureBlocking(true);
        this.serverSocketChannel.bind((SocketAddress) new InetSocketAddress(this.port));
        Thread thread = new Thread(new Runnable() { // from class: org.apache.nifi.distributed.cache.server.AbstractCacheServer.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        final SocketChannel accept = AbstractCacheServer.this.serverSocketChannel.accept();
                        AbstractCacheServer.logger.debug("Connected to {}", new Object[]{accept});
                        Thread thread2 = new Thread(new Runnable() { // from class: org.apache.nifi.distributed.cache.server.AbstractCacheServer.1.1
                            @Override // java.lang.Runnable
                            public void run() {
                                SocketChannelInputStream sSLSocketChannelInputStream;
                                SocketChannelOutputStream sSLSocketChannelOutputStream;
                                String hostName = accept.socket().getInetAddress().getHostName();
                                try {
                                    if (AbstractCacheServer.this.sslContext == null) {
                                        sSLSocketChannelInputStream = new SocketChannelInputStream(accept);
                                        sSLSocketChannelOutputStream = new SocketChannelOutputStream(accept);
                                    } else {
                                        SSLSocketChannel sSLSocketChannel = new SSLSocketChannel(AbstractCacheServer.this.sslContext, accept, false);
                                        sSLSocketChannel.connect();
                                        sSLSocketChannelInputStream = new SSLSocketChannelInputStream(sSLSocketChannel);
                                        sSLSocketChannelOutputStream = new SSLSocketChannelOutputStream(sSLSocketChannel);
                                    }
                                    try {
                                        try {
                                            BufferedInputStream bufferedInputStream = new BufferedInputStream(sSLSocketChannelInputStream);
                                            try {
                                                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(sSLSocketChannelOutputStream);
                                                try {
                                                    StandardVersionNegotiator versionNegotiator = AbstractCacheServer.this.getVersionNegotiator();
                                                    ProtocolHandshake.receiveHandshake(bufferedInputStream, bufferedOutputStream, versionNegotiator);
                                                    boolean z = true;
                                                    while (z) {
                                                        z = AbstractCacheServer.this.listen(bufferedInputStream, bufferedOutputStream, versionNegotiator.getVersion());
                                                    }
                                                    AbstractCacheServer.logger.debug("Client issued close on {}", new Object[]{accept});
                                                    bufferedOutputStream.close();
                                                    bufferedInputStream.close();
                                                    AbstractCacheServer.this.processInputThreads.remove(Thread.currentThread());
                                                } catch (Throwable th) {
                                                    try {
                                                        bufferedOutputStream.close();
                                                    } catch (Throwable th2) {
                                                        th.addSuppressed(th2);
                                                    }
                                                    throw th;
                                                }
                                            } catch (Throwable th3) {
                                                try {
                                                    bufferedInputStream.close();
                                                } catch (Throwable th4) {
                                                    th3.addSuppressed(th4);
                                                }
                                                throw th3;
                                            }
                                        } catch (SocketTimeoutException e) {
                                            AbstractCacheServer.logger.debug("30 sec timeout reached", e);
                                            AbstractCacheServer.this.processInputThreads.remove(Thread.currentThread());
                                        } catch (IOException | HandshakeException e2) {
                                            if (!AbstractCacheServer.this.stopped) {
                                                AbstractCacheServer.logger.error("{} unable to communicate with remote peer {} due to {}", new Object[]{this, hostName, e2.toString()});
                                                if (AbstractCacheServer.logger.isDebugEnabled()) {
                                                    AbstractCacheServer.logger.error("", e2);
                                                }
                                            }
                                            AbstractCacheServer.this.processInputThreads.remove(Thread.currentThread());
                                        }
                                    } catch (Throwable th5) {
                                        AbstractCacheServer.this.processInputThreads.remove(Thread.currentThread());
                                        throw th5;
                                    }
                                } catch (IOException e3) {
                                    AbstractCacheServer.logger.error("Cannot create input and/or output streams for {}", new Object[]{AbstractCacheServer.this.identifier}, e3);
                                    if (AbstractCacheServer.logger.isDebugEnabled()) {
                                        AbstractCacheServer.logger.error("", e3);
                                    }
                                    try {
                                        accept.close();
                                    } catch (IOException e4) {
                                    }
                                }
                            }
                        });
                        thread2.setName("Distributed Cache Server Communications Thread: " + AbstractCacheServer.this.identifier);
                        thread2.setDaemon(true);
                        thread2.start();
                        AbstractCacheServer.this.processInputThreads.add(thread2);
                    } catch (IOException e) {
                        if (AbstractCacheServer.this.stopped) {
                            return;
                        }
                        AbstractCacheServer.logger.error("{} unable to accept connection from remote peer due to {}", this, e.toString());
                        if (AbstractCacheServer.logger.isDebugEnabled()) {
                            AbstractCacheServer.logger.error("", e);
                            return;
                        }
                        return;
                    }
                }
            }
        });
        thread.setDaemon(true);
        thread.setName("Distributed Cache Server: " + this.identifier);
        thread.start();
    }

    protected StandardVersionNegotiator getVersionNegotiator() {
        return new StandardVersionNegotiator(new int[]{1});
    }

    @Override // org.apache.nifi.distributed.cache.server.CacheServer
    public void stop() throws IOException {
        this.stopped = true;
        logger.info("Stopping CacheServer {}", new Object[]{this.identifier});
        if (this.serverSocketChannel != null && this.serverSocketChannel.isOpen()) {
            try {
                this.serverSocketChannel.close();
            } catch (IOException e) {
                logger.warn("Server Socket Close Failed", e);
            }
        }
        for (Thread thread : this.processInputThreads) {
            thread.interrupt();
            int i = 0;
            while (!thread.isInterrupted()) {
                int i2 = i;
                i++;
                if (i2 < 5) {
                    try {
                        Thread.sleep(50L);
                    } catch (InterruptedException e2) {
                    }
                }
            }
        }
        this.processInputThreads.clear();
    }

    public String toString() {
        return "CacheServer[id=" + this.identifier + "]";
    }

    protected abstract boolean listen(InputStream inputStream, OutputStream outputStream, int i) throws IOException;

    /* JADX INFO: Access modifiers changed from: protected */
    public byte[] readValue(DataInputStream dataInputStream) throws IOException {
        byte[] bArr = new byte[validateSize(dataInputStream.readInt())];
        dataInputStream.readFully(bArr);
        return bArr;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int validateSize(int i) {
        if (i <= this.maxReadSize) {
            return i;
        }
        throw new IllegalStateException(String.format("Size [%d] exceeds maximum configured read [%d]", Integer.valueOf(i), Integer.valueOf(this.maxReadSize)));
    }
}
