package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/apache/nifi/processors/standard/TestListenUDP.class */
public class TestListenUDP {
    private static final String LOCALHOST = "localhost";
    private int port = 0;
    private TestRunner runner;

    @Mock
    private ChannelResponder<DatagramChannel> responder;

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenUDP$MockListenUDP.class */
    private static class MockListenUDP extends ListenUDP {
        private final List<StandardEvent<DatagramChannel>> mockEvents;

        public MockListenUDP(List<StandardEvent<DatagramChannel>> list) {
            this.mockEvents = list;
        }

        @OnScheduled
        public void onScheduled(ProcessContext processContext) throws IOException {
            super.onScheduled(processContext);
            this.events.addAll(this.mockEvents);
        }

        protected ChannelDispatcher createDispatcher(ProcessContext processContext, BlockingQueue<StandardEvent> blockingQueue) {
            return (ChannelDispatcher) Mockito.mock(ChannelDispatcher.class);
        }
    }

    @BeforeEach
    public void setUp() throws Exception {
        this.runner = TestRunners.newTestRunner(ListenUDP.class);
        this.port = NetworkUtils.getAvailableUdpPort();
        this.runner.setProperty(ListenUDP.PORT, Integer.toString(this.port));
    }

    @Test
    public void testCustomValidation() {
        this.runner.setProperty(ListenUDP.PORT, "1");
        this.runner.assertValid();
        this.runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
        this.runner.assertNotValid();
        this.runner.setProperty(ListenUDP.SENDING_HOST_PORT, "1234");
        this.runner.assertValid();
        this.runner.setProperty(ListenUDP.SENDING_HOST, "");
        this.runner.assertNotValid();
    }

    @Test
    public void testDefaultBehavior() throws IOException, InterruptedException {
        List<String> messages = getMessages(15);
        int size = messages.size();
        run(new DatagramSocket(), messages, size);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
        verifyFlowFiles(this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS));
        verifyProvenance(size);
    }

    @Test
    public void testSendingMoreThanQueueSize() throws IOException, InterruptedException {
        this.runner.setProperty(ListenUDP.MAX_MESSAGE_QUEUE_SIZE, String.valueOf(3));
        run(new DatagramSocket(), getMessages(20), 3);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 3);
        verifyFlowFiles(this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS));
        verifyProvenance(3);
    }

    @Test
    public void testBatchingSingleSender() throws IOException, InterruptedException {
        this.runner.setProperty(ListenerProperties.MESSAGE_DELIMITER, "NN");
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
        run(new DatagramSocket(), getMessages(5), 2);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
        List flowFilesForRelationship = this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS);
        ((MockFlowFile) flowFilesForRelationship.get(0)).assertContentEquals("This is message 1NNThis is message 2NNThis is message 3");
        ((MockFlowFile) flowFilesForRelationship.get(1)).assertContentEquals("This is message 4NNThis is message 5");
        verifyProvenance(2);
    }

    @Test
    public void testBatchingWithDifferentSenders() {
        byte[] bytes = "test message".getBytes(StandardCharsets.UTF_8);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StandardEvent("sender1", bytes, this.responder));
        arrayList.add(new StandardEvent("sender1", bytes, this.responder));
        arrayList.add(new StandardEvent("sender2", bytes, this.responder));
        arrayList.add(new StandardEvent("sender2", bytes, this.responder));
        this.runner = TestRunners.newTestRunner(new MockListenUDP(arrayList));
        this.runner.setProperty(ListenUDP.PORT, "1");
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 2);
        verifyProvenance(2);
    }

    @Test
    public void testRunWhenNoEventsAvailable() {
        this.runner = TestRunners.newTestRunner(new MockListenUDP(new ArrayList()));
        this.runner.setProperty(ListenUDP.PORT, "1");
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
        this.runner.run(5);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
    }

    @Test
    public void testWithSendingHostAndPortSameAsSender() throws IOException, InterruptedException {
        Integer valueOf = Integer.valueOf(NetworkUtils.getAvailableUdpPort());
        this.runner.setProperty(ListenUDP.SENDING_HOST, LOCALHOST);
        this.runner.setProperty(ListenUDP.SENDING_HOST_PORT, String.valueOf(valueOf));
        DatagramSocket datagramSocket = new DatagramSocket(valueOf.intValue());
        List<String> messages = getMessages(6);
        int size = messages.size();
        run(datagramSocket, messages, size);
        this.runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, messages.size());
        verifyFlowFiles(this.runner.getFlowFilesForRelationship(ListenUDP.REL_SUCCESS));
        verifyProvenance(size);
    }

    private List<String> getMessages(int i) {
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < i; i2++) {
            arrayList.add("This is message " + (i2 + 1));
        }
        return arrayList;
    }

    private void verifyFlowFiles(List<MockFlowFile> list) {
        for (int i = 0; i < list.size(); i++) {
            MockFlowFile mockFlowFile = list.get(i);
            mockFlowFile.assertContentEquals("This is message " + (i + 1));
            Assertions.assertEquals(String.valueOf(this.port), mockFlowFile.getAttribute("udp.port"));
            Assertions.assertTrue(StringUtils.isNotEmpty(mockFlowFile.getAttribute("udp.sender")));
        }
    }

    private void verifyProvenance(int i) {
        List<ProvenanceEventRecord> provenanceEvents = this.runner.getProvenanceEvents();
        Assertions.assertEquals(i, provenanceEvents.size());
        for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
            Assertions.assertEquals(ProvenanceEventType.RECEIVE, provenanceEventRecord.getEventType());
            Assertions.assertTrue(provenanceEventRecord.getTransitUri().startsWith("udp://"));
        }
    }

    protected void run(DatagramSocket datagramSocket, List<String> list, int i) throws IOException, InterruptedException {
        this.runner.run(1, false, true);
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(LOCALHOST, this.port);
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                byte[] bytes = it.next().getBytes(StandardCharsets.UTF_8);
                datagramSocket.send(new DatagramPacket(bytes, bytes.length, inetSocketAddress));
            }
            this.runner.run(i, false, false);
            this.runner.assertTransferCount(ListenUDP.REL_SUCCESS, i);
            this.runner.shutdown();
        } catch (Throwable th) {
            this.runner.shutdown();
            throw th;
        }
    }
}
