/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.standard.PutTCP;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

@Timeout(value=30L)
public class TestPutTCP {
    private static final String TCP_SERVER_ADDRESS = "127.0.0.1";
    private static final String SERVER_VARIABLE = "server.address";
    private static final String TCP_SERVER_ADDRESS_EL = "${server.address}";
    private static final int MIN_INVALID_PORT = 0;
    private static final int MIN_VALID_PORT = 1;
    private static final int MAX_VALID_PORT = 65535;
    private static final int MAX_INVALID_PORT = 65536;
    private static final int VALID_LARGE_FILE_SIZE = 32768;
    private static final int VALID_SMALL_FILE_SIZE = 64;
    private static final int LOAD_TEST_ITERATIONS = 500;
    private static final int LOAD_TEST_THREAD_COUNT = 1;
    private static final int DEFAULT_ITERATIONS = 1;
    private static final int DEFAULT_THREAD_COUNT = 1;
    private static final char CONTENT_CHAR = 'x';
    private static final String OUTGOING_MESSAGE_DELIMITER = "\n";
    private static final String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
    private static final String[] EMPTY_FILE = new String[]{""};
    private static final String[] VALID_FILES = new String[]{"abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@\u00a3$%^&*()_+:|{}[];\\"};
    private EventServer eventServer;
    private int port;
    private TestRunner runner;
    private BlockingQueue<ByteArrayMessage> messages;

    @BeforeEach
    public void setup() throws Exception {
        this.runner = TestRunners.newTestRunner(PutTCP.class);
        this.runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
        this.port = NetworkUtils.getAvailableTcpPort();
    }

    @AfterEach
    public void cleanup() {
        this.runner.shutdown();
        this.shutdownServer();
    }

    @Test
    public void testPortProperty() {
        this.runner.setProperty(PutTCP.PORT, Integer.toString(0));
        this.runner.assertNotValid();
        this.runner.setProperty(PutTCP.PORT, Integer.toString(1));
        this.runner.assertValid();
        this.runner.setProperty(PutTCP.PORT, Integer.toString(65535));
        this.runner.assertValid();
        this.runner.setProperty(PutTCP.PORT, Integer.toString(65536));
        this.runner.assertNotValid();
    }

    @Test
    public void testRunSuccess() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessSslContextService() throws Exception {
        TlsConfiguration tlsConfiguration = new TemporaryKeyStoreBuilder().build();
        SSLContext sslContext = SslContextUtils.createSslContext(tlsConfiguration);
        Assertions.assertNotNull((Object)sslContext, (String)"SSLContext not found");
        String identifier = SSLContextService.class.getName();
        SSLContextService sslContextService = (SSLContextService)Mockito.mock(SSLContextService.class);
        Mockito.when((Object)sslContextService.getIdentifier()).thenReturn((Object)identifier);
        Mockito.when((Object)sslContextService.createContext()).thenReturn((Object)sslContext);
        this.runner.addControllerService(identifier, (ControllerService)sslContextService);
        this.runner.enableControllerService((ControllerService)sslContextService);
        this.runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port, sslContext);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessServerVariableExpression() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessPruneSenders() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port);
        this.sendTestData(VALID_FILES);
        this.assertTransfers(VALID_FILES.length);
        this.assertMessagesReceived(VALID_FILES);
        this.runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
        this.runner.run(1, false, false);
        this.runner.clearTransferState();
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessMultiCharDelimiter() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
        this.createTestServer(this.port);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessConnectionPerFlowFile() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
        this.createTestServer(this.port);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessConnectionFailure() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
        this.shutdownServer();
        this.sendTestData(VALID_FILES);
        this.runner.assertQueueEmpty();
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessEmptyFile() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port);
        this.sendTestData(EMPTY_FILE);
        this.assertTransfers(1);
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testRunSuccessLargeValidFile() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
        this.createTestServer(this.port);
        String[] testData = this.createContent(32768);
        this.sendTestData(testData);
        this.assertMessagesReceived(testData);
    }

    @Test
    public void testRunSuccessFiveHundredMessages() throws Exception {
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.createTestServer(this.port);
        String[] testData = this.createContent(64);
        this.sendTestData(testData, 500, 1);
        this.assertMessagesReceived(testData, 500);
    }

    private void createTestServer(int port) throws Exception {
        this.createTestServer(port, null);
    }

    private void createTestServer(int port, SSLContext sslContext) throws Exception {
        this.messages = new LinkedBlockingQueue<ByteArrayMessage>();
        byte[] delimiter = this.getDelimiter();
        InetAddress listenAddress = InetAddress.getByName(TCP_SERVER_ADDRESS);
        ByteArrayMessageNettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory((ComponentLog)this.runner.getLogger(), listenAddress, port, TransportProtocol.TCP, delimiter, 32768, this.messages);
        if (sslContext != null) {
            serverFactory.setSslContext(sslContext);
        }
        serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        this.eventServer = serverFactory.getEventServer();
    }

    private void shutdownServer() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
        }
    }

    private void configureProperties(String host, String outgoingMessageDelimiter, boolean connectionPerFlowFile) {
        this.runner.setProperty(PutTCP.HOSTNAME, host);
        this.runner.setProperty(PutTCP.PORT, Integer.toString(this.port));
        if (outgoingMessageDelimiter != null) {
            this.runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, outgoingMessageDelimiter);
        }
        this.runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
        this.runner.assertValid();
    }

    private void sendTestData(String[] testData) {
        this.sendTestData(testData, 1, 1);
    }

    private void sendTestData(String[] testData, int iterations, int threadCount) {
        this.runner.setThreadCount(threadCount);
        for (int i = 0; i < iterations; ++i) {
            for (String item : testData) {
                this.runner.enqueue(item.getBytes());
            }
            this.runner.run(testData.length, false, i == 0);
        }
    }

    private void assertTransfers(int successCount) {
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
        this.runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
    }

    private void assertMessagesReceived(String[] sentData) throws Exception {
        this.assertMessagesReceived(sentData, 1);
        this.runner.assertQueueEmpty();
    }

    private void assertMessagesReceived(String[] sentData, int iterations) throws Exception {
        for (int i = 0; i < iterations; ++i) {
            for (String item : sentData) {
                ByteArrayMessage message = this.messages.take();
                Assertions.assertNotNull((Object)message, (String)String.format("Message [%d] not found", i));
                Assertions.assertTrue((boolean)Arrays.asList(sentData).contains(new String(message.getMessage())));
            }
        }
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
        this.runner.clearTransferState();
        Assertions.assertNull(this.messages.poll(), (String)"Unexpected extra messages found");
    }

    private String[] createContent(int size) {
        char[] content = new char[size];
        for (int i = 0; i < size; ++i) {
            content[i] = 120;
        }
        return new String[]{new String(content)};
    }

    private byte[] getDelimiter() {
        String delimiter = this.runner.getProcessContext().getProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER).getValue();
        if (delimiter != null) {
            return delimiter.getBytes();
        }
        return null;
    }
}

