/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.DelimitedInputStream;
import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.event.transport.netty.StreamingNettyEventSenderFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processors.standard.ListenTCP;
import org.apache.nifi.processors.standard.PutUDP;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription(value="The PutTCP processor receives a FlowFile and transmits the FlowFile content over a TCP connection to the configured TCP server. By default, the FlowFiles are transmitted over the same TCP connection (or pool of TCP connections if multiple input threads are configured). To assist the TCP server with determining message boundaries, an optional \"Outgoing Message Delimiter\" string can be configured which is appended to the end of each FlowFiles content when it is transmitted over the TCP connection. An optional \"Connection Per FlowFile\" parameter can be specified to change the behaviour so that each FlowFiles content is transmitted over a single TCP connection which is opened when the FlowFile is received and closed after the FlowFile has been sent. This option should only be used for low message volume scenarios, otherwise the platform may run out of TCP sockets.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@SeeAlso(value={ListenTCP.class, PutUDP.class})
@Tags(value={"remote", "egress", "put", "tcp"})
@SupportsBatching
public class PutTCP
extends AbstractPutEventProcessor<InputStream> {
    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(CONNECTION_PER_FLOWFILE, OUTGOING_MESSAGE_DELIMITER, TIMEOUT, SSL_CONTEXT_SERVICE, CHARSET);
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        try {
            session.read(flowFile, inputStream -> {
                InputStream inputStreamEvent = inputStream;
                String delimiter = this.getOutgoingMessageDelimiter(context, flowFile);
                if (delimiter != null) {
                    Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
                    inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
                }
                this.eventSender.sendEvent((Object)inputStreamEvent);
            });
            session.getProvenanceReporter().send(flowFile, this.transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(flowFile, REL_SUCCESS);
            session.commitAsync();
        }
        catch (Exception e) {
            this.getLogger().error("Send Failed {}", new Object[]{flowFile, e});
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            session.commitAsync();
            context.yield();
        }
    }

    protected String getProtocol(ProcessContext context) {
        return TCP_VALUE.getValue();
    }

    protected NettyEventSenderFactory<InputStream> getNettyEventSenderFactory(String hostname, int port, String protocol) {
        return new StreamingNettyEventSenderFactory(this.getLogger(), hostname, port, TransportProtocol.TCP);
    }
}

