/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processors.standard.ListenTCPRecord;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.web.util.ssl.SslContextUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestListenTCPRecord {
    static final String SCHEMA_TEXT = "{\n  \"name\": \"syslogRecord\",\n  \"namespace\": \"nifi\",\n  \"type\": \"record\",\n  \"fields\": [\n    { \"name\": \"timestamp\", \"type\": \"string\" },\n    { \"name\": \"logsource\", \"type\": \"string\" },\n    { \"name\": \"message\", \"type\": \"string\" }\n  ]\n}";
    static final String DATA = "[{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}]";
    private static final Logger LOGGER = LoggerFactory.getLogger(TestListenTCPRecord.class);
    private static final long TEST_TIMEOUT = 30L;
    private static final String LOCALHOST = "localhost";
    private static final String SSL_CONTEXT_IDENTIFIER = SSLContextService.class.getName();
    private static SSLContext keyStoreSslContext;
    private static SSLContext trustStoreSslContext;
    private TestRunner runner;

    @BeforeAll
    public static void configureServices() throws TlsException {
        keyStoreSslContext = SslContextUtils.createKeyStoreSslContext();
        trustStoreSslContext = SslContextUtils.createTrustStoreSslContext();
    }

    @BeforeEach
    public void setup() throws InitializationException {
        this.runner = TestRunners.newTestRunner(ListenTCPRecord.class);
        String readerId = "record-reader";
        JsonTreeReader readerFactory = new JsonTreeReader();
        this.runner.addControllerService("record-reader", (ControllerService)readerFactory);
        this.runner.setProperty((ControllerService)readerFactory, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
        this.runner.setProperty((ControllerService)readerFactory, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
        this.runner.enableControllerService((ControllerService)readerFactory);
        String writerId = "record-writer";
        MockRecordWriter writerFactory = new MockRecordWriter("timestamp, logsource, message");
        this.runner.addControllerService("record-writer", (ControllerService)writerFactory);
        this.runner.enableControllerService((ControllerService)writerFactory);
        this.runner.setProperty(ListenTCPRecord.RECORD_READER, "record-reader");
        this.runner.setProperty(ListenTCPRecord.RECORD_WRITER, "record-writer");
    }

    @Test
    public void testCustomValidate() throws InitializationException {
        this.runner.setProperty(ListenTCPRecord.PORT, "1");
        this.runner.assertValid();
        this.enableSslContextService(keyStoreSslContext);
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, "");
        this.runner.assertNotValid();
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
        this.runner.assertValid();
    }

    @Test
    @Timeout(value=30L)
    public void testRunOneRecordPerFlowFile() throws IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "1");
        this.run(3, null);
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        for (int i = 0; i < mockFlowFiles.size(); ++i) {
            MockFlowFile flowFile = (MockFlowFile)mockFlowFiles.get(i);
            flowFile.assertAttributeEquals("record.count", "1");
            String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
            Assertions.assertNotNull((Object)content);
            Assertions.assertTrue((boolean)content.contains("This is a test " + (i + 1)));
        }
    }

    @Test
    @Timeout(value=30L)
    public void testRunMultipleRecordsPerFlowFileLessThanBatchSize() throws IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.RECORD_BATCH_SIZE, "5");
        this.run(1, null);
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assertions.assertEquals((int)1, (int)mockFlowFiles.size());
        MockFlowFile flowFile = (MockFlowFile)mockFlowFiles.get(0);
        flowFile.assertAttributeEquals("record.count", "3");
        String content = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
        Assertions.assertNotNull((Object)content);
        Assertions.assertTrue((boolean)content.contains("This is a test 1"));
        Assertions.assertTrue((boolean)content.contains("This is a test 2"));
        Assertions.assertTrue((boolean)content.contains("This is a test 3"));
    }

    @Test
    @Timeout(value=30L)
    public void testRunClientAuthRequired() throws InitializationException, IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
        this.enableSslContextService(keyStoreSslContext);
        this.run(1, keyStoreSslContext);
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assertions.assertEquals((int)1, (int)mockFlowFiles.size());
        String content = new String(((MockFlowFile)mockFlowFiles.get(0)).toByteArray(), StandardCharsets.UTF_8);
        Assertions.assertNotNull((Object)content);
        Assertions.assertTrue((boolean)content.contains("This is a test 1"));
        Assertions.assertTrue((boolean)content.contains("This is a test 2"));
        Assertions.assertTrue((boolean)content.contains("This is a test 3"));
    }

    @Test
    @Timeout(value=30L)
    public void testRunSSLClientDNsAddedAsAttributes() throws InitializationException, IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.REQUIRED.name());
        this.enableSslContextService(keyStoreSslContext);
        this.run(1, keyStoreSslContext);
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assertions.assertEquals((int)1, (int)mockFlowFiles.size());
        MockFlowFile flowFile = (MockFlowFile)mockFlowFiles.get(0);
        flowFile.assertAttributeEquals("client.certificate.subject.dn", "CN=localhost");
        flowFile.assertAttributeEquals("client.certificate.issuer.dn", "CN=localhost");
    }

    @Test
    @Timeout(value=30L)
    public void testRunClientAuthNone() throws InitializationException, IOException, InterruptedException {
        this.runner.setProperty(ListenTCPRecord.CLIENT_AUTH, ClientAuth.NONE.name());
        this.enableSslContextService(keyStoreSslContext);
        this.run(1, trustStoreSslContext);
        List mockFlowFiles = this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS);
        Assertions.assertEquals((int)1, (int)mockFlowFiles.size());
        String content = new String(((MockFlowFile)mockFlowFiles.get(0)).toByteArray(), StandardCharsets.UTF_8);
        Assertions.assertNotNull((Object)content);
        Assertions.assertTrue((boolean)content.contains("This is a test 1"));
        Assertions.assertTrue((boolean)content.contains("This is a test 2"));
        Assertions.assertTrue((boolean)content.contains("This is a test 3"));
    }

    protected void run(int expectedTransferred, SSLContext sslContext) throws IOException, InterruptedException {
        int port = NetworkUtils.availablePort();
        this.runner.setProperty(ListenTCPRecord.PORT, Integer.toString(port));
        this.runner.run(1, false, true);
        Thread thread = new Thread(() -> {
            try (Socket socket = this.getSocket(port, sslContext);){
                OutputStream outputStream = socket.getOutputStream();
                outputStream.write(DATA.getBytes(StandardCharsets.UTF_8));
                outputStream.flush();
            }
            catch (IOException e) {
                LOGGER.error("Failed Sending Records to Port [{}]", (Object)port, (Object)e);
            }
        });
        thread.start();
        int iterations = 0;
        while (this.getSuccessCount() < expectedTransferred) {
            this.runner.run(1, false, false);
            ++iterations;
            Optional firstErrorMessage = this.runner.getLogger().getErrorMessages().stream().findFirst();
            Assertions.assertNull(firstErrorMessage.orElse(null));
        }
        LOGGER.info("Completed after iterations [{}]", (Object)iterations);
    }

    private int getSuccessCount() {
        return this.runner.getFlowFilesForRelationship(ListenTCPRecord.REL_SUCCESS).size();
    }

    private Socket getSocket(int port, SSLContext sslContext) throws IOException {
        Socket socket = sslContext == null ? new Socket(LOCALHOST, port) : sslContext.getSocketFactory().createSocket(LOCALHOST, port);
        return socket;
    }

    private void enableSslContextService(SSLContext sslContext) throws InitializationException {
        RestrictedSSLContextService sslContextService = (RestrictedSSLContextService)Mockito.mock(RestrictedSSLContextService.class);
        Mockito.when((Object)sslContextService.getIdentifier()).thenReturn((Object)SSL_CONTEXT_IDENTIFIER);
        Mockito.when((Object)sslContextService.createContext()).thenReturn((Object)sslContext);
        this.runner.addControllerService(SSL_CONTEXT_IDENTIFIER, (ControllerService)sslContextService);
        this.runner.enableControllerService((ControllerService)sslContextService);
        this.runner.setProperty(ListenTCPRecord.SSL_CONTEXT_SERVICE, SSL_CONTEXT_IDENTIFIER);
    }
}

