/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.ListenUDP;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
public class TestListenUDP {
    private static final String LOCALHOST = "localhost";
    private int port = 0;
    private TestRunner runner;
    @Mock
    private ChannelResponder<DatagramChannel> responder;

    @BeforeEach
    public void setUp() throws Exception {
        this.runner = TestRunners.newTestRunner(ListenUDP.class);
        this.port = NetworkUtils.getAvailableUdpPort();
        this.runner.setProperty(ListenUDP.PORT, Integer.toString(this.port));
    }

    @Test
    public void testCustomValidation() {
        this.runner.setProperty(ListenUDP.PORT, "1");
        this.runner.assertValid();
        this.runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
        this.runner.assertNotValid();
        this.runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234");
        this.runner.assertValid();
        this.runner.setProperty(ListenUDP.SENDING_HOST, "");
        this.runner.assertNotValid();
    }

    @Test
    public void testDefaultBehavior() throws IOException, InterruptedException {
        List<String> messages = this.getMessages(15);
        int expectedTransferred = messages.size();
        this.run(new DatagramSocket(), messages, expectedTransferred);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
        this.verifyFlowFiles(mockFlowFiles);
        this.verifyProvenance(expectedTransferred);
    }

    @Test
    public void testSendingMoreThanQueueSize() throws IOException, InterruptedException {
        int maxQueueSize = 3;
        this.runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(3));
        List<String> messages = this.getMessages(20);
        this.run(new DatagramSocket(), messages, 3);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 3);
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
        this.verifyFlowFiles(mockFlowFiles);
        this.verifyProvenance(3);
    }

    @Test
    public void testBatchingSingleSender() throws IOException, InterruptedException {
        String delimiter = "NN";
        this.runner.setProperty(ListenerProperties.MESSAGE_DELIMITER, "NN");
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
        List<String> messages = this.getMessages(5);
        int expectedTransferred = 2;
        this.run(new DatagramSocket(), messages, 2);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
        MockFlowFile mockFlowFile1 = (MockFlowFile)mockFlowFiles.get(0);
        mockFlowFile1.assertContentEquals("This is message 1NNThis is message 2NNThis is message 3");
        MockFlowFile mockFlowFile2 = (MockFlowFile)mockFlowFiles.get(1);
        mockFlowFile2.assertContentEquals("This is message 4NNThis is message 5");
        this.verifyProvenance(2);
    }

    @Test
    public void testBatchingWithDifferentSenders() {
        String sender1 = "sender1";
        String sender2 = "sender2";
        byte[] message = "test message".getBytes(StandardCharsets.UTF_8);
        ArrayList<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<StandardEvent<DatagramChannel>>();
        mockEvents.add(new StandardEvent("sender1", message, this.responder));
        mockEvents.add(new StandardEvent("sender1", message, this.responder));
        mockEvents.add(new StandardEvent("sender2", message, this.responder));
        mockEvents.add(new StandardEvent("sender2", message, this.responder));
        MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
        this.runner = TestRunners.newTestRunner((Processor)mockListenUDP);
        this.runner.setProperty(ListenUDP.PORT, "1");
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
        this.verifyProvenance(2);
    }

    @Test
    public void testRunWhenNoEventsAvailable() {
        ArrayList<StandardEvent<DatagramChannel>> mockEvents = new ArrayList<StandardEvent<DatagramChannel>>();
        MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
        this.runner = TestRunners.newTestRunner((Processor)mockListenUDP);
        this.runner.setProperty(ListenUDP.PORT, "1");
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
        this.runner.run(5);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
    }

    @Test
    public void testWithSendingHostAndPortSameAsSender() throws IOException, InterruptedException {
        Integer sendingPort = NetworkUtils.getAvailableUdpPort();
        this.runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
        this.runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(sendingPort));
        DatagramSocket socket = new DatagramSocket(sendingPort);
        List<String> messages = this.getMessages(6);
        int expectedTransferred = messages.size();
        this.run(socket, messages, expectedTransferred);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
        this.verifyFlowFiles(mockFlowFiles);
        this.verifyProvenance(expectedTransferred);
    }

    private List<String> getMessages(int numMessages) {
        ArrayList<String> messages = new ArrayList<String>();
        for (int i = 0; i < numMessages; ++i) {
            messages.add("This is message " + (i + 1));
        }
        return messages;
    }

    private void verifyFlowFiles(List<MockFlowFile> mockFlowFiles) {
        for (int i = 0; i < mockFlowFiles.size(); ++i) {
            MockFlowFile flowFile = mockFlowFiles.get(i);
            flowFile.assertContentEquals("This is message " + (i + 1));
            Assertions.assertEquals((Object)String.valueOf(this.port), (Object)flowFile.getAttribute("udp.port"));
            Assertions.assertTrue((boolean)StringUtils.isNotEmpty((CharSequence)flowFile.getAttribute("udp.sender")));
        }
    }

    private void verifyProvenance(int expectedNumEvents) {
        List provEvents = this.runner.getProvenanceEvents();
        Assertions.assertEquals((int)expectedNumEvents, (int)provEvents.size());
        for (ProvenanceEventRecord event : provEvents) {
            Assertions.assertEquals((Object)ProvenanceEventType.RECEIVE, (Object)event.getEventType());
            Assertions.assertTrue((boolean)event.getTransitUri().startsWith("udp://"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void run(DatagramSocket socket, List<String> messages, int expectedTransferred) throws IOException, InterruptedException {
        this.runner.run(1, false, true);
        try {
            InetSocketAddress destination = new InetSocketAddress(LOCALHOST, this.port);
            for (String message : messages) {
                byte[] buffer = message.getBytes(StandardCharsets.UTF_8);
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length, destination);
                socket.send(packet);
            }
            this.runner.run(expectedTransferred, false, false);
            this.runner.assertTransferCount(ListenUDP.REL_SUCCESS, expectedTransferred);
        }
        finally {
            this.runner.shutdown();
        }
    }

    private static class MockListenUDP
    extends ListenUDP {
        private final List<StandardEvent<DatagramChannel>> mockEvents;

        public MockListenUDP(List<StandardEvent<DatagramChannel>> mockEvents) {
            this.mockEvents = mockEvents;
        }

        @OnScheduled
        public void onScheduled(ProcessContext context) throws IOException {
            super.onScheduled(context);
            this.events.addAll(this.mockEvents);
        }

        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) {
            return (ChannelDispatcher)Mockito.mock(ChannelDispatcher.class);
        }
    }
}

