/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.net.InetAddress;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.standard.PutUDP;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=10L)
public class TestPutUDP {
    private static final String UDP_SERVER_ADDRESS = "127.0.0.1";
    private static final String SERVER_VARIABLE = "SERVER";
    private static final String DELIMITER = "\n";
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    private static final int MAX_FRAME_LENGTH = 32800;
    private static final int VALID_LARGE_FILE_SIZE = 32768;
    private static final int INVALID_LARGE_FILE_SIZE = 1000000;
    private static final char CONTENT_CHAR = 'x';
    private static final int DATA_WAIT_PERIOD = 50;
    private static final String[] EMPTY_FILE = new String[]{""};
    private static final String[] VALID_FILES = new String[]{"FIRST", "SECOND", "12345678", "343424222", "!@\u00a3$%^&*()_+:|{}[];\\"};
    private TestRunner runner;
    private int port;
    private EventServer eventServer;
    private BlockingQueue<ByteArrayMessage> messages;

    @BeforeEach
    public void setup() throws Exception {
        this.runner = TestRunners.newTestRunner(PutUDP.class);
        this.runner.setVariable(SERVER_VARIABLE, UDP_SERVER_ADDRESS);
        this.createTestServer(32768);
    }

    @AfterEach
    public void cleanup() {
        this.runner.shutdown();
        this.removeTestServer();
    }

    @Test
    public void testSend() throws Exception {
        this.configureProperties();
        this.sendMessages(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testSendEmptyFile() throws Exception {
        this.configureProperties();
        this.sendMessages(EMPTY_FILE);
        this.checkRelationships(EMPTY_FILE.length, 0);
        this.checkNoDataReceived();
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testSendLargeFile() throws Exception {
        this.configureProperties();
        String[] testData = this.createContent(32768);
        this.sendMessages(testData);
        this.assertMessagesReceived(testData);
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testSendLargeFileInvalid() throws Exception {
        this.configureProperties();
        String[] testData = this.createContent(1000000);
        this.sendMessages(testData);
        this.checkRelationships(0, testData.length);
        this.checkNoDataReceived();
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testSendChangePropertiesAndSend() throws Exception {
        this.configureProperties();
        this.sendMessages(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
        this.reset();
        this.configureProperties();
        this.sendMessages(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
        this.reset();
        this.configureProperties();
        this.sendMessages(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
        this.runner.assertQueueEmpty();
    }

    private void reset() throws Exception {
        this.runner.clearTransferState();
        this.removeTestServer();
        this.createTestServer(32800);
    }

    private void configureProperties() {
        this.runner.setProperty(PutUDP.HOSTNAME, UDP_SERVER_ADDRESS);
        this.runner.setProperty(PutUDP.PORT, Integer.toString(this.port));
        this.runner.assertValid();
    }

    private void sendMessages(String[] testData) {
        for (String item : testData) {
            this.runner.enqueue(item.getBytes());
            this.runner.run();
        }
    }

    private void checkRelationships(int successCount, int failedCount) {
        this.runner.assertTransferCount(PutUDP.REL_SUCCESS, successCount);
        this.runner.assertTransferCount(PutUDP.REL_FAILURE, failedCount);
    }

    private void checkNoDataReceived() throws Exception {
        Thread.sleep(50L);
        Assertions.assertNull(this.messages.poll(), (String)"Unexpected extra messages found");
    }

    private void assertMessagesReceived(String[] sentMessages) throws Exception {
        for (String item : sentMessages) {
            ByteArrayMessage packet = this.messages.take();
            Assertions.assertNotNull((Object)packet);
            Assertions.assertArrayEquals((byte[])item.getBytes(), (byte[])packet.getMessage());
        }
        this.runner.assertTransferCount(PutUDP.REL_SUCCESS, sentMessages.length);
        Assertions.assertNull(this.messages.poll(), (String)"Unexpected extra messages found");
    }

    private String[] createContent(int size) {
        char[] content = new char[size];
        for (int i = 0; i < size; ++i) {
            content[i] = 120;
        }
        return new String[]{new String(content).concat(DELIMITER)};
    }

    private void createTestServer(int frameSize) throws Exception {
        this.messages = new LinkedBlockingQueue<ByteArrayMessage>();
        byte[] delimiter = DELIMITER.getBytes(CHARSET);
        InetAddress listenAddress = InetAddress.getByName(UDP_SERVER_ADDRESS);
        ByteArrayMessageNettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory((ComponentLog)this.runner.getLogger(), listenAddress, this.port, TransportProtocol.UDP, delimiter, frameSize, this.messages);
        serverFactory.setSocketReceiveBuffer(Integer.valueOf(32800));
        serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        this.eventServer = serverFactory.getEventServer();
        this.port = this.eventServer.getListeningPort();
    }

    private void removeTestServer() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
            this.eventServer = null;
        }
    }
}

