/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.standard.PutTCP;
import org.apache.nifi.processors.standard.property.TransmissionStrategy;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@Timeout(value=30L)
@ExtendWith(value={MockitoExtension.class})
public class TestPutTCP {
    private static final String TCP_SERVER_ADDRESS = "127.0.0.1";
    private static final String SERVER_VARIABLE = "server.address";
    private static final String TCP_SERVER_ADDRESS_EL = "${server.address}";
    private static final int VALID_LARGE_FILE_SIZE = 32768;
    private static final int VALID_SMALL_FILE_SIZE = 64;
    private static final int LOAD_TEST_ITERATIONS = 500;
    private static final int LOAD_TEST_THREAD_COUNT = 1;
    private static final int DEFAULT_ITERATIONS = 1;
    private static final int DEFAULT_THREAD_COUNT = 1;
    private static final char CONTENT_CHAR = 'x';
    private static final String OUTGOING_MESSAGE_DELIMITER = "\n";
    private static final String OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR = "{delimiter}\r\n";
    private static final String[] EMPTY_FILE = new String[]{""};
    private static final String[] VALID_FILES = new String[]{"abcdefghijklmnopqrstuvwxyz", "zyxwvutsrqponmlkjihgfedcba", "12345678", "343424222", "!@\u00a3$%^&*()_+:|{}[];\\"};
    private static final String WRITER_SERVICE_ID = RecordSetWriterFactory.class.getSimpleName();
    private static final String READER_SERVICE_ID = RecordReaderFactory.class.getSimpleName();
    private static final String RECORD = String.class.getSimpleName();
    private static final Record NULL_RECORD = null;
    @Mock
    private RecordSetWriterFactory writerFactory;
    @Mock
    private RecordReaderFactory readerFactory;
    @Mock
    private RecordReader recordReader;
    @Mock
    private Record record;
    private EventServer eventServer;
    private TestRunner runner;
    private BlockingQueue<ByteArrayMessage> messages;

    @BeforeEach
    public void setup() throws Exception {
        this.runner = TestRunners.newTestRunner(PutTCP.class);
        this.runner.setVariable(SERVER_VARIABLE, TCP_SERVER_ADDRESS);
    }

    @AfterEach
    public void cleanup() {
        this.runner.shutdown();
        this.shutdownServer();
    }

    @Test
    public void testRunSuccess() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessSslContextService() throws Exception {
        TlsConfiguration tlsConfiguration = new TemporaryKeyStoreBuilder().build();
        SSLContext sslContext = SslContextUtils.createSslContext(tlsConfiguration);
        Assertions.assertNotNull((Object)sslContext, (String)"SSLContext not found");
        String identifier = SSLContextService.class.getName();
        SSLContextService sslContextService = (SSLContextService)Mockito.mock(SSLContextService.class);
        Mockito.when((Object)sslContextService.getIdentifier()).thenReturn((Object)identifier);
        Mockito.when((Object)sslContextService.createContext()).thenReturn((Object)sslContext);
        this.runner.addControllerService(identifier, (ControllerService)sslContextService);
        this.runner.enableControllerService((ControllerService)sslContextService);
        this.runner.setProperty(PutTCP.SSL_CONTEXT_SERVICE, identifier);
        this.createTestServer(sslContext, OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessServerVariableExpression() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS_EL, OUTGOING_MESSAGE_DELIMITER, false);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessPruneSenders() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.sendTestData(VALID_FILES);
        this.assertTransfers(VALID_FILES.length);
        this.assertMessagesReceived(VALID_FILES);
        this.runner.setProperty(PutTCP.IDLE_EXPIRATION, "500 ms");
        this.runner.run(1, false, false);
        this.runner.clearTransferState();
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessMultiCharDelimiter() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER_MULTI_CHAR, false);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessConnectionPerFlowFile() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessConnectionFailure() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
        this.shutdownServer();
        this.sendTestData(VALID_FILES);
        this.runner.assertQueueEmpty();
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.sendTestData(VALID_FILES);
        this.assertMessagesReceived(VALID_FILES);
    }

    @Test
    public void testRunSuccessEmptyFile() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        this.sendTestData(EMPTY_FILE);
        this.assertTransfers(1);
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testRunSuccessLargeValidFile() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, true);
        String[] testData = this.createContent(32768);
        this.sendTestData(testData);
        this.assertMessagesReceived(testData);
    }

    @Test
    public void testRunSuccessFiveHundredMessages() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.configureProperties(TCP_SERVER_ADDRESS, OUTGOING_MESSAGE_DELIMITER, false);
        String[] testData = this.createContent(64);
        this.sendTestData(testData, 500, 1);
        this.assertMessagesReceived(testData, 500);
    }

    @Test
    void testRunSuccessRecordOriented() throws Exception {
        this.createTestServer(OUTGOING_MESSAGE_DELIMITER);
        this.runner.setProperty(PutTCP.HOSTNAME, TCP_SERVER_ADDRESS);
        this.runner.setProperty(PutTCP.PORT, String.valueOf(this.eventServer.getListeningPort()));
        this.runner.setProperty(PutTCP.TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue());
        Mockito.when((Object)this.writerFactory.getIdentifier()).thenReturn((Object)WRITER_SERVICE_ID);
        this.runner.addControllerService(WRITER_SERVICE_ID, (ControllerService)this.writerFactory);
        this.runner.enableControllerService((ControllerService)this.writerFactory);
        this.runner.setProperty(PutTCP.RECORD_WRITER, WRITER_SERVICE_ID);
        Mockito.when((Object)this.readerFactory.getIdentifier()).thenReturn((Object)READER_SERVICE_ID);
        this.runner.addControllerService(READER_SERVICE_ID, (ControllerService)this.readerFactory);
        this.runner.enableControllerService((ControllerService)this.readerFactory);
        this.runner.setProperty(PutTCP.RECORD_READER, READER_SERVICE_ID);
        Mockito.when((Object)this.readerFactory.createRecordReader((FlowFile)ArgumentMatchers.any(), (InputStream)ArgumentMatchers.any(), (ComponentLog)ArgumentMatchers.any())).thenReturn((Object)this.recordReader);
        Mockito.when((Object)this.recordReader.nextRecord()).thenReturn((Object)this.record, (Object[])new Record[]{NULL_RECORD});
        Mockito.when((Object)this.writerFactory.createWriter((ComponentLog)ArgumentMatchers.any(), (RecordSchema)ArgumentMatchers.any(), (OutputStream)ArgumentMatchers.any(OutputStream.class), (FlowFile)ArgumentMatchers.any(FlowFile.class))).thenAnswer(invocationOnMock -> {
            OutputStream outputStream = (OutputStream)invocationOnMock.getArgument(2, OutputStream.class);
            return new MockRecordSetWriter(outputStream);
        });
        this.runner.enqueue(RECORD);
        this.runner.run();
        this.runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
        Iterator successFlowFiles = this.runner.getFlowFilesForRelationship(PutTCP.REL_SUCCESS).iterator();
        Assertions.assertTrue((boolean)successFlowFiles.hasNext(), (String)"Success FlowFiles not found");
        MockFlowFile successFlowFile = (MockFlowFile)successFlowFiles.next();
        successFlowFile.assertAttributeEquals("record.count.transmitted", Integer.toString(1));
        List provenanceEventRecords = this.runner.getProvenanceEvents();
        Optional<ProvenanceEventRecord> sendEventFound = provenanceEventRecords.stream().filter(eventRecord -> ProvenanceEventType.SEND == eventRecord.getEventType()).findFirst();
        Assertions.assertTrue((boolean)sendEventFound.isPresent(), (String)"Provenance Send Event not found");
        ProvenanceEventRecord sendEventRecord = sendEventFound.get();
        Assertions.assertTrue((boolean)sendEventRecord.getTransitUri().contains(TCP_SERVER_ADDRESS), (String)"Transit URI not matched");
        ByteArrayMessage message = this.messages.take();
        Assertions.assertNotNull((Object)message);
        Assertions.assertArrayEquals((byte[])RECORD.getBytes(StandardCharsets.UTF_8), (byte[])message.getMessage());
        Assertions.assertEquals((Object)TCP_SERVER_ADDRESS, (Object)message.getSender());
    }

    private void createTestServer(String delimiter) throws UnknownHostException {
        this.createTestServer(null, delimiter);
    }

    private void createTestServer(SSLContext sslContext, String delimiter) throws UnknownHostException {
        this.messages = new LinkedBlockingQueue<ByteArrayMessage>();
        InetAddress listenAddress = InetAddress.getByName(TCP_SERVER_ADDRESS);
        ByteArrayMessageNettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory((ComponentLog)this.runner.getLogger(), listenAddress, 0, TransportProtocol.TCP, delimiter.getBytes(), 32768, this.messages);
        if (sslContext != null) {
            serverFactory.setSslContext(sslContext);
        }
        serverFactory.setShutdownQuietPeriod(Duration.ZERO);
        serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        this.eventServer = serverFactory.getEventServer();
    }

    private void shutdownServer() {
        if (this.eventServer != null) {
            this.eventServer.shutdown();
        }
    }

    private void configureProperties(String host, String outgoingMessageDelimiter, boolean connectionPerFlowFile) {
        this.runner.setProperty(PutTCP.HOSTNAME, host);
        this.runner.setProperty(PutTCP.PORT, String.valueOf(this.eventServer.getListeningPort()));
        if (outgoingMessageDelimiter != null) {
            this.runner.setProperty(PutTCP.OUTGOING_MESSAGE_DELIMITER, outgoingMessageDelimiter);
        }
        this.runner.setProperty(PutTCP.CONNECTION_PER_FLOWFILE, String.valueOf(connectionPerFlowFile));
        this.runner.assertValid();
    }

    private void sendTestData(String[] testData) {
        this.sendTestData(testData, 1, 1);
    }

    private void sendTestData(String[] testData, int iterations, int threadCount) {
        this.runner.setThreadCount(threadCount);
        for (int i = 0; i < iterations; ++i) {
            for (String item : testData) {
                this.runner.enqueue(item.getBytes());
            }
            this.runner.run(testData.length, false, i == 0);
        }
    }

    private void assertTransfers(int successCount) {
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, successCount);
        this.runner.assertTransferCount(PutTCP.REL_FAILURE, 0);
    }

    private void assertMessagesReceived(String[] sentData) throws Exception {
        this.assertMessagesReceived(sentData, 1);
        this.runner.assertQueueEmpty();
    }

    private void assertMessagesReceived(String[] sentData, int iterations) throws Exception {
        for (int i = 0; i < iterations; ++i) {
            for (String item : sentData) {
                ByteArrayMessage message = this.messages.take();
                Assertions.assertNotNull((Object)message, (String)String.format("Message [%d] not found", i));
                Assertions.assertTrue((boolean)Arrays.asList(sentData).contains(new String(message.getMessage())));
            }
        }
        this.runner.assertTransferCount(PutTCP.REL_SUCCESS, sentData.length * iterations);
        this.runner.clearTransferState();
        Assertions.assertNull(this.messages.poll(), (String)"Unexpected extra messages found");
    }

    private String[] createContent(int size) {
        char[] content = new char[size];
        Arrays.fill(content, 'x');
        return new String[]{new String(content)};
    }

    private static class MockRecordSetWriter
    implements RecordSetWriter {
        private final OutputStream outputStream;

        private MockRecordSetWriter(OutputStream outputStream) {
            this.outputStream = outputStream;
        }

        public WriteResult write(RecordSet recordSet) {
            return WriteResult.EMPTY;
        }

        public void beginRecordSet() {
        }

        public WriteResult finishRecordSet() {
            return WriteResult.EMPTY;
        }

        public WriteResult write(Record record) throws IOException {
            this.outputStream.write(RECORD.getBytes(StandardCharsets.UTF_8));
            this.outputStream.write(TestPutTCP.OUTGOING_MESSAGE_DELIMITER.getBytes(StandardCharsets.UTF_8));
            return WriteResult.of((int)1, Collections.emptyMap());
        }

        public String getMimeType() {
            return null;
        }

        public void flush() {
        }

        public void close() {
        }
    }
}

