/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gettcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.processors.gettcp.AbstractSocketHandler;
import org.apache.nifi.processors.gettcp.MessageHandler;

class ReceivingClient
extends AbstractSocketHandler {
    private final ScheduledExecutorService connectionScheduler;
    private volatile int reconnectAttempts;
    private volatile long delayMillisBeforeReconnect;
    private volatile MessageHandler messageHandler;
    private volatile InetSocketAddress connectedAddress;

    public ReceivingClient(InetSocketAddress address, ScheduledExecutorService connectionScheduler, int readingBufferSize, byte endOfMessageByte) {
        super(address, readingBufferSize, endOfMessageByte);
        this.connectionScheduler = connectionScheduler;
    }

    public void setReconnectAttempts(int reconnectAttempts) {
        this.reconnectAttempts = reconnectAttempts;
    }

    public void setDelayMillisBeforeReconnect(long delayMillisBeforeReconnect) {
        this.delayMillisBeforeReconnect = delayMillisBeforeReconnect;
    }

    public void setMessageHandler(MessageHandler messageHandler) {
        this.messageHandler = messageHandler;
    }

    @Override
    InetSocketAddress connect() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicInteger attempt = new AtomicInteger();
        final AtomicReference connectionError = new AtomicReference();
        this.connectionScheduler.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    ReceivingClient.this.rootChannel = ReceivingClient.this.doConnect(ReceivingClient.this.address);
                    latch.countDown();
                    ReceivingClient.this.connectedAddress = ReceivingClient.this.address;
                }
                catch (Exception e) {
                    if (ReceivingClient.this.logger.isInfoEnabled()) {
                        ReceivingClient.this.logger.info("Failed to connect to primary endpoint '" + ReceivingClient.this.address + "'.");
                    }
                    if (attempt.incrementAndGet() <= ReceivingClient.this.reconnectAttempts) {
                        if (ReceivingClient.this.logger.isInfoEnabled()) {
                            ReceivingClient.this.logger.info("Will attempt to reconnect to '" + ReceivingClient.this.address + "'.");
                        }
                        ReceivingClient.this.connectionScheduler.schedule(this, ReceivingClient.this.delayMillisBeforeReconnect, TimeUnit.MILLISECONDS);
                    }
                    connectionError.set(e);
                    ReceivingClient.this.logger.error("Failed to connect to secondary endpoint.");
                    latch.countDown();
                }
            }
        });
        try {
            boolean finishedTask = latch.await((long)this.reconnectAttempts * this.delayMillisBeforeReconnect + 2000L, TimeUnit.MILLISECONDS);
            if (finishedTask) {
                if (connectionError.get() != null) {
                    throw (Exception)connectionError.get();
                }
            } else {
                this.logger.error("Exceeded wait time to connect. Possible deadlock, please report!. Interrupting.");
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Current thread is interrupted");
        }
        return this.connectedAddress;
    }

    private SocketChannel doConnect(InetSocketAddress addressToConnect) throws IOException {
        SocketChannel channel = SocketChannel.open();
        if (!channel.connect(addressToConnect)) {
            throw new IllegalStateException("Failed to connect to Server at: " + addressToConnect);
        }
        channel.configureBlocking(false);
        channel.register(this.selector, 1);
        return channel;
    }

    @Override
    void processData(SelectionKey selectionKey, ByteBuffer messageBuffer) throws IOException {
        byte[] message = new byte[messageBuffer.limit()];
        this.logger.debug("Received message(size=" + message.length + ")");
        messageBuffer.get(message);
        byte lastByteValue = message[message.length - 1];
        boolean partialMessage = false;
        if (lastByteValue != this.endOfMessageByte) {
            partialMessage = true;
            selectionKey.attach(1);
        } else {
            Integer wasLastPartial = (Integer)selectionKey.attachment();
            if (wasLastPartial != null && wasLastPartial == 1) {
                partialMessage = true;
                selectionKey.attach(0);
            }
        }
        if (this.messageHandler != null) {
            this.messageHandler.handle(this.connectedAddress, message, partialMessage);
        }
    }
}

