/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processors.standard.ListenUDP;
import org.apache.nifi.processors.standard.ListenUDPRecord;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestListenUDPRecord {
    static final String SCHEMA_TEXT = "{\n  \"name\": \"syslogRecord\",\n  \"namespace\": \"nifi\",\n  \"type\": \"record\",\n  \"fields\": [\n    { \"name\": \"timestamp\", \"type\": \"string\" },\n    { \"name\": \"logsource\", \"type\": \"string\" },\n    { \"name\": \"message\", \"type\": \"string\" }\n  ]\n}";
    static final String DATAGRAM_1 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"} ]";
    static final String DATAGRAM_2 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"} ]";
    static final String DATAGRAM_3 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"} ]";
    static final String MULTI_DATAGRAM_1 = "[{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}]";
    static final String MULTI_DATAGRAM_2 = "[{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 4\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 5\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 6\"}]";
    private TestableListenUDPRecord proc;
    private TestRunner runner;
    private MockRecordWriter mockRecordWriter;

    @BeforeEach
    public void setup() throws InitializationException {
        this.proc = new TestableListenUDPRecord();
        this.runner = TestRunners.newTestRunner((Processor)this.proc);
        this.runner.setProperty(ListenUDP.PORT, "1");
        String readerId = "record-reader";
        JsonTreeReader readerFactory = new JsonTreeReader();
        this.runner.addControllerService("record-reader", (ControllerService)readerFactory);
        this.runner.setProperty((ControllerService)readerFactory, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
        this.runner.setProperty((ControllerService)readerFactory, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
        this.runner.enableControllerService((ControllerService)readerFactory);
        String writerId = "record-writer";
        this.mockRecordWriter = new MockRecordWriter("timestamp, logsource, message");
        this.runner.addControllerService("record-writer", (ControllerService)this.mockRecordWriter);
        this.runner.enableControllerService((ControllerService)this.mockRecordWriter);
        this.runner.setProperty(ListenUDPRecord.RECORD_READER, "record-reader");
        this.runner.setProperty(ListenUDPRecord.RECORD_WRITER, "record-writer");
    }

    @Test
    public void testSuccessWithBatchSizeGreaterThanAvailableRecords() {
        String sender = "foo";
        StandardEvent event1 = new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event1);
        StandardEvent event2 = new StandardEvent("foo", DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event2);
        StandardEvent event3 = new StandardEvent("foo", DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event3);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        MockFlowFile flowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
        flowFile.assertAttributeEquals("record.count", "3");
    }

    @Test
    public void testSuccessWithBatchLessThanAvailableRecords() {
        String sender = "foo";
        StandardEvent event1 = new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event1);
        StandardEvent event2 = new StandardEvent("foo", DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event2);
        StandardEvent event3 = new StandardEvent("foo", DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event3);
        this.runner.setProperty(ListenUDPRecord.BATCH_SIZE, "1");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        MockFlowFile flowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
        flowFile.assertAttributeEquals("record.count", "1");
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        flowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
        flowFile.assertAttributeEquals("record.count", "1");
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        flowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
        flowFile.assertAttributeEquals("record.count", "1");
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
    }

    @Test
    public void testMultipleRecordsPerDatagram() {
        String sender = "foo";
        StandardEvent event1 = new StandardEvent("foo", MULTI_DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event1);
        StandardEvent event2 = new StandardEvent("foo", MULTI_DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event2);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        MockFlowFile flowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0);
        flowFile.assertAttributeEquals("record.count", "6");
    }

    @Test
    public void testParseFailure() {
        String sender = "foo";
        StandardEvent event1 = new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event1);
        StandardEvent event2 = new StandardEvent("foo", "WILL NOT PARSE".getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event2);
        this.runner.run();
        this.runner.assertTransferCount(ListenUDPRecord.REL_SUCCESS, 1);
        this.runner.assertTransferCount(ListenUDPRecord.REL_PARSE_FAILURE, 1);
        MockFlowFile flowFile = (MockFlowFile)this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_PARSE_FAILURE).get(0);
        flowFile.assertContentEquals("WILL NOT PARSE");
    }

    @Test
    public void testWriterFailure() throws InitializationException {
        String writerId = "record-writer";
        this.mockRecordWriter = new MockRecordWriter("timestamp, logsource, message", false, 2);
        this.runner.addControllerService("record-writer", (ControllerService)this.mockRecordWriter);
        this.runner.enableControllerService((ControllerService)this.mockRecordWriter);
        this.runner.setProperty(ListenUDPRecord.RECORD_WRITER, "record-writer");
        String sender = "foo";
        StandardEvent event1 = new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event1);
        StandardEvent event2 = new StandardEvent("foo", DATAGRAM_2.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event2);
        StandardEvent event3 = new StandardEvent("foo", DATAGRAM_3.getBytes(StandardCharsets.UTF_8), null);
        this.proc.addEvent(event3);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_PARSE_FAILURE, 0);
    }

    private static class TestableListenUDPRecord
    extends ListenUDPRecord {
        private volatile BlockingQueue<StandardEvent> testEvents = new LinkedBlockingQueue<StandardEvent>();
        private volatile BlockingQueue<StandardEvent> testErrorEvents = new LinkedBlockingQueue<StandardEvent>();

        private TestableListenUDPRecord() {
        }

        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<StandardEvent> events) throws IOException {
            return (ChannelDispatcher)Mockito.mock(ChannelDispatcher.class);
        }

        public void addEvent(StandardEvent event) {
            this.testEvents.add(event);
        }

        public void addErrorEvent(StandardEvent event) {
            this.testErrorEvents.add(event);
        }

        protected StandardEvent getMessage(boolean longPoll, boolean pollErrorQueue, ProcessSession session) {
            StandardEvent event = null;
            if (pollErrorQueue) {
                event = (StandardEvent)this.testErrorEvents.poll();
            }
            if (event == null) {
                try {
                    event = longPoll ? this.testEvents.poll(this.getLongPollTimeout(), TimeUnit.MILLISECONDS) : (StandardEvent)this.testEvents.poll();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            }
            return event;
        }
    }
}

