/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.standard.PutSyslog;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TestPutSyslog {
    private static final String ADDRESS = "127.0.0.1";
    private static final String LOCALHOST = "localhost";
    private static final String MESSAGE_BODY = String.class.getName();
    private static final String MESSAGE_PRIORITY = "1";
    private static final String DEFAULT_PROTOCOL = "UDP";
    private static final String TIMESTAMP = "Jan 1 00:00:00";
    private static final String VERSION = "2";
    private static final String SYSLOG_MESSAGE = String.format("<%s>%s %s %s", "1", "Jan 1 00:00:00", "localhost", MESSAGE_BODY);
    private static final String VERSION_SYSLOG_MESSAGE = String.format("<%s>%s %s %s %s", "1", "2", "Jan 1 00:00:00", "localhost", MESSAGE_BODY);
    private static final int MAX_FRAME_LENGTH = 1024;
    private static final Charset CHARSET = StandardCharsets.UTF_8;
    private static final String DELIMITER = "\n";
    private static final int POLL_TIMEOUT_SECONDS = 5;
    private TestRunner runner;
    private final TransportProtocol protocol = TransportProtocol.UDP;
    private InetAddress address;
    private int port;

    @BeforeEach
    public void setRunner() throws UnknownHostException {
        this.address = InetAddress.getByName(ADDRESS);
        this.port = NetworkUtils.getAvailableUdpPort();
        this.runner = TestRunners.newTestRunner(PutSyslog.class);
        this.runner.setProperty(PutSyslog.HOSTNAME, ADDRESS);
        this.runner.setProperty(PutSyslog.PROTOCOL, this.protocol.toString());
        this.runner.setProperty(PutSyslog.PORT, Integer.toString(this.port));
        this.runner.setProperty(PutSyslog.MSG_BODY, MESSAGE_BODY);
        this.runner.setProperty(PutSyslog.MSG_PRIORITY, MESSAGE_PRIORITY);
        this.runner.setProperty(PutSyslog.MSG_HOSTNAME, LOCALHOST);
        this.runner.setProperty(PutSyslog.MSG_TIMESTAMP, TIMESTAMP);
        this.runner.assertValid();
    }

    @Test
    public void testRunNoFlowFiles() {
        this.runner.run();
        this.runner.assertQueueEmpty();
    }

    @Test
    public void testRunSuccess() throws InterruptedException {
        this.assertSyslogMessageSuccess(SYSLOG_MESSAGE, Collections.emptyMap());
    }

    @Test
    public void testRunSuccessSyslogVersion() throws InterruptedException {
        String versionAttributeKey = "version";
        this.runner.setProperty(PutSyslog.MSG_VERSION, String.format("${%s}", "version"));
        Map<String, String> attributes = Collections.singletonMap("version", VERSION);
        this.assertSyslogMessageSuccess(VERSION_SYSLOG_MESSAGE, attributes);
    }

    @Test
    public void testRunInvalid() {
        this.runner.setProperty(PutSyslog.MSG_PRIORITY, Integer.toString(Integer.MAX_VALUE));
        this.runner.enqueue(new byte[0]);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(PutSyslog.REL_INVALID);
    }

    @Test
    public void testRunFailure() {
        this.runner.setProperty(PutSyslog.PROTOCOL, PutSyslog.TCP_VALUE);
        this.runner.setProperty(PutSyslog.PORT, Integer.toString(NetworkUtils.getAvailableTcpPort()));
        this.runner.enqueue(new byte[0]);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(PutSyslog.REL_FAILURE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void assertSyslogMessageSuccess(String expectedSyslogMessage, Map<String, String> attributes) throws InterruptedException {
        LinkedBlockingQueue messages = new LinkedBlockingQueue();
        byte[] delimiter = DELIMITER.getBytes(CHARSET);
        ByteArrayMessageNettyEventServerFactory serverFactory = new ByteArrayMessageNettyEventServerFactory((ComponentLog)this.runner.getLogger(), this.address, this.port, this.protocol, delimiter, 1024, messages);
        serverFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        serverFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        EventServer eventServer = serverFactory.getEventServer();
        try {
            this.runner.enqueue(expectedSyslogMessage, attributes);
            this.runner.run();
            ByteArrayMessage message = (ByteArrayMessage)messages.poll(5L, TimeUnit.SECONDS);
            String syslogMessage = new String(message.getMessage(), CHARSET);
            this.runner.assertAllFlowFilesTransferred(PutSyslog.REL_SUCCESS);
            Assertions.assertEquals((Object)expectedSyslogMessage, (Object)syslogMessage);
            this.assertProvenanceRecordTransitUriFound();
        }
        finally {
            eventServer.shutdown();
        }
    }

    private void assertProvenanceRecordTransitUriFound() {
        List provenanceEvents = this.runner.getProvenanceEvents();
        Assertions.assertFalse((boolean)provenanceEvents.isEmpty(), (String)"Provenance Events not found");
        ProvenanceEventRecord provenanceEventRecord = (ProvenanceEventRecord)provenanceEvents.iterator().next();
        Assertions.assertEquals((Object)ProvenanceEventType.SEND, (Object)provenanceEventRecord.getEventType());
        String transitUri = provenanceEventRecord.getTransitUri();
        Assertions.assertNotNull((Object)transitUri, (String)"Transit URI not found");
        Assertions.assertTrue((boolean)transitUri.contains(DEFAULT_PROTOCOL), (String)"Transit URI Protocol not found");
        Assertions.assertTrue((boolean)transitUri.contains(ADDRESS), (String)"Transit URI Hostname not found");
        Assertions.assertTrue((boolean)transitUri.contains(Integer.toString(this.port)), (String)"Transit URI Port not found");
    }
}

