/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.ftpserver.ssl.ClientAuth;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.standard.ListenRELP;
import org.apache.nifi.processors.standard.relp.event.RELPMessage;
import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
public class TestListenRELP {
    public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
    public static final String RELP_FRAME_DATA = "this is a relp message here";
    private static final String LOCALHOST = "localhost";
    private static final Charset CHARSET = StandardCharsets.US_ASCII;
    private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(10L);
    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder().txnr(1L).command("open").dataLength("relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog".length()).data("relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog".getBytes(CHARSET)).build();
    static final RELPFrame RELP_FRAME = new RELPFrame.Builder().txnr(2L).command("syslog").dataLength("this is a relp message here".length()).data("this is a relp message here".getBytes(CHARSET)).build();
    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder().txnr(3L).command("close").dataLength(0).data(new byte[0]).build();
    @Mock
    private RestrictedSSLContextService sslContextService;
    private RELPEncoder encoder;
    private TestRunner runner;

    @BeforeEach
    public void setup() {
        this.encoder = new RELPEncoder(CHARSET);
        MockListenRELP mockRELP = new MockListenRELP();
        this.runner = TestRunners.newTestRunner((Processor)mockRELP);
    }

    @AfterEach
    public void shutdown() {
        this.runner.shutdown();
    }

    @Test
    public void testRELPFramesAreReceivedSuccessfully() throws Exception {
        int relpFrames = 5;
        List<RELPFrame> frames = this.getFrames(5);
        this.run(frames, 5, null);
        List events = this.runner.getProvenanceEvents();
        Assertions.assertNotNull((Object)events);
        Assertions.assertEquals((int)5, (int)events.size());
        ProvenanceEventRecord event = (ProvenanceEventRecord)events.get(0);
        Assertions.assertEquals((Object)ProvenanceEventType.RECEIVE, (Object)event.getEventType());
        Assertions.assertTrue((boolean)event.getTransitUri().toLowerCase().startsWith("relp"), (String)"transit uri must be set and start with proper protocol");
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
        Assertions.assertEquals((int)5, (int)mockFlowFiles.size());
        MockFlowFile mockFlowFile = (MockFlowFile)mockFlowFiles.get(0);
        Assertions.assertEquals((Object)String.valueOf(RELP_FRAME.getTxnr()), (Object)mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
        Assertions.assertEquals((Object)RELP_FRAME.getCommand(), (Object)mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
        Assertions.assertFalse((boolean)StringUtils.isBlank((CharSequence)mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
        Assertions.assertFalse((boolean)StringUtils.isBlank((CharSequence)mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
    }

    @Test
    public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws Exception {
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "5");
        int relpFrames = 3;
        List<RELPFrame> frames = this.getFrames(3);
        boolean expectedFlowFiles = true;
        this.run(frames, 1, null);
        List events = this.runner.getProvenanceEvents();
        Assertions.assertNotNull((Object)events);
        Assertions.assertEquals((int)1, (int)events.size());
        ProvenanceEventRecord event = (ProvenanceEventRecord)events.get(0);
        Assertions.assertEquals((Object)ProvenanceEventType.RECEIVE, (Object)event.getEventType());
        Assertions.assertTrue((boolean)event.getTransitUri().toLowerCase().startsWith("relp"), (String)"transit uri must be set and start with proper protocol");
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
        Assertions.assertEquals((int)1, (int)mockFlowFiles.size());
        MockFlowFile mockFlowFile = (MockFlowFile)mockFlowFiles.get(0);
        Assertions.assertEquals((Object)RELP_FRAME.getCommand(), (Object)mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
        Assertions.assertFalse((boolean)StringUtils.isBlank((CharSequence)mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
        Assertions.assertFalse((boolean)StringUtils.isBlank((CharSequence)mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
    }

    @Test
    public void testRunMutualTls() throws Exception {
        String serviceIdentifier = SSLContextService.class.getName();
        Mockito.when((Object)this.sslContextService.getIdentifier()).thenReturn((Object)serviceIdentifier);
        SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
        Mockito.when((Object)this.sslContextService.createContext()).thenReturn((Object)sslContext);
        this.runner.addControllerService(serviceIdentifier, (ControllerService)this.sslContextService);
        this.runner.enableControllerService((ControllerService)this.sslContextService);
        this.runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
        this.runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
        int relpFrames = 3;
        List<RELPFrame> frames = this.getFrames(3);
        this.run(frames, 3, sslContext);
    }

    @Test
    public void testBatchingWithDifferentSenders() {
        String sender1 = "/192.168.1.50:55000";
        String sender2 = "/192.168.1.50:55001";
        String sender3 = "/192.168.1.50:55002";
        ArrayList<RELPMessage> mockEvents = new ArrayList<RELPMessage>();
        mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        mockEvents.add(new RELPMessage(sender2, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        mockEvents.add(new RELPMessage(sender3, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        mockEvents.add(new RELPMessage(sender3, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
        MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
        this.runner = TestRunners.newTestRunner((Processor)mockListenRELP);
        this.runner.setProperty(ListenerProperties.PORT, Integer.toString(NetworkUtils.availablePort()));
        this.runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
        this.runner.shutdown();
    }

    private void run(List<RELPFrame> frames, int flowFiles, SSLContext sslContext) throws Exception {
        int port = NetworkUtils.availablePort();
        this.runner.setProperty(ListenerProperties.PORT, Integer.toString(port));
        this.runner.run(1, false, true);
        byte[] relpMessages = this.getRELPMessages(frames);
        this.sendMessages(port, relpMessages, sslContext);
        this.runner.run(flowFiles, false, false);
        this.runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
    }

    private byte[] getRELPMessages(List<RELPFrame> frames) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        for (RELPFrame frame : frames) {
            byte[] encodedFrame = this.encoder.encode(frame);
            outputStream.write(encodedFrame);
            outputStream.flush();
        }
        return outputStream.toByteArray();
    }

    private List<RELPFrame> getFrames(int relpFrames) {
        ArrayList<RELPFrame> frames = new ArrayList<RELPFrame>();
        frames.add(OPEN_FRAME);
        for (int i = 0; i < relpFrames; ++i) {
            frames.add(RELP_FRAME);
        }
        frames.add(CLOSE_FRAME);
        return frames;
    }

    private void sendMessages(int port, byte[] relpMessages, SSLContext sslContext) throws Exception {
        ByteArrayNettyEventSenderFactory eventSenderFactory = new ByteArrayNettyEventSenderFactory((ComponentLog)this.runner.getLogger(), LOCALHOST, port, TransportProtocol.TCP);
        eventSenderFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        eventSenderFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        if (sslContext != null) {
            eventSenderFactory.setSslContext(sslContext);
        }
        eventSenderFactory.setTimeout(SENDER_TIMEOUT);
        try (EventSender eventSender = eventSenderFactory.getEventSender();){
            eventSender.sendEvent((Object)relpMessages);
        }
    }

    private static class MockListenRELP
    extends ListenRELP {
        private final List<RELPMessage> mockEvents;

        public MockListenRELP() {
            this.mockEvents = new ArrayList<RELPMessage>();
        }

        public MockListenRELP(List<RELPMessage> mockEvents) {
            this.mockEvents = mockEvents;
        }

        @OnScheduled
        public void onScheduled(ProcessContext context) throws IOException {
            super.onScheduled(context);
            this.events.addAll(this.mockEvents);
        }
    }
}

