/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.standard;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.DelimitedInputStream;
import org.apache.nifi.event.transport.netty.NettyEventSenderFactory;
import org.apache.nifi.event.transport.netty.StreamingNettyEventSenderFactory;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.put.AbstractPutEventProcessor;
import org.apache.nifi.processors.standard.ListenTCP;
import org.apache.nifi.processors.standard.PutUDP;
import org.apache.nifi.processors.standard.property.TransmissionStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch;

@CapabilityDescription(value="Sends serialized FlowFiles or Records over TCP to a configurable destination with optional support for TLS")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@SeeAlso(value={ListenTCP.class, PutUDP.class})
@Tags(value={"remote", "egress", "put", "tcp"})
@SupportsBatching
@WritesAttributes(value={@WritesAttribute(attribute="record.count.transmitted", description="Count of records transmitted to configured destination address")})
public class PutTCP
extends AbstractPutEventProcessor<InputStream> {
    public static final String RECORD_COUNT_TRANSMITTED = "record.count.transmitted";
    static final PropertyDescriptor TRANSMISSION_STRATEGY = new PropertyDescriptor.Builder().name("Transmission Strategy").displayName("Transmission Strategy").description("Specifies the strategy used for reading input FlowFiles and transmitting messages to the destination socket address").required(true).allowableValues(TransmissionStrategy.class).defaultValue(TransmissionStrategy.FLOWFILE_ORIENTED.getValue()).build();
    static final PropertyDescriptor DEPENDENT_CHARSET = new PropertyDescriptor.Builder().fromPropertyDescriptor(CHARSET).dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.FLOWFILE_ORIENTED.getValue(), new String[0]).build();
    static final PropertyDescriptor DEPENDENT_OUTGOING_MESSAGE_DELIMITER = new PropertyDescriptor.Builder().fromPropertyDescriptor(OUTGOING_MESSAGE_DELIMITER).dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.FLOWFILE_ORIENTED.getValue(), new String[0]).build();
    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("Record Reader").displayName("Record Reader").description("Specifies the Controller Service to use for reading Records from input FlowFiles").identifiesControllerService(RecordReaderFactory.class).required(true).dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue(), new String[0]).build();
    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").displayName("Record Writer").description("Specifies the Controller Service to use for writing Records to the configured socket address").identifiesControllerService(RecordSetWriterFactory.class).required(true).dependsOn(TRANSMISSION_STRATEGY, TransmissionStrategy.RECORD_ORIENTED.getValue(), new String[0]).build();
    private static final List<PropertyDescriptor> ADDITIONAL_PROPERTIES = Collections.unmodifiableList(Arrays.asList(CONNECTION_PER_FLOWFILE, SSL_CONTEXT_SERVICE, TRANSMISSION_STRATEGY, DEPENDENT_OUTGOING_MESSAGE_DELIMITER, DEPENDENT_CHARSET, RECORD_READER, RECORD_WRITER));

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return ADDITIONAL_PROPERTIES;
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        ProcessSession session = sessionFactory.createSession();
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        TransmissionStrategy transmissionStrategy = TransmissionStrategy.valueOf(context.getProperty(TRANSMISSION_STRATEGY).getValue());
        StopWatch stopWatch = new StopWatch(true);
        try {
            int recordCount;
            if (TransmissionStrategy.RECORD_ORIENTED == transmissionStrategy) {
                recordCount = this.sendRecords(context, session, flowFile);
            } else {
                this.sendFlowFile(context, session, flowFile);
                recordCount = 0;
            }
            FlowFile processedFlowFile = session.putAttribute(flowFile, RECORD_COUNT_TRANSMITTED, Integer.toString(recordCount));
            session.getProvenanceReporter().send(processedFlowFile, this.transitUri, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
            session.transfer(processedFlowFile, REL_SUCCESS);
            session.commitAsync();
        }
        catch (Exception e) {
            this.getLogger().error("Send Failed {}", new Object[]{flowFile, e});
            session.transfer(session.penalize(flowFile), REL_FAILURE);
            session.commitAsync();
            context.yield();
        }
    }

    protected String getProtocol(ProcessContext context) {
        return TCP_VALUE.getValue();
    }

    protected NettyEventSenderFactory<InputStream> getNettyEventSenderFactory(String hostname, int port, String protocol) {
        return new StreamingNettyEventSenderFactory(this.getLogger(), hostname, port, TransportProtocol.TCP);
    }

    private void sendFlowFile(ProcessContext context, ProcessSession session, FlowFile flowFile) {
        session.read(flowFile, inputStream -> {
            InputStream inputStreamEvent = inputStream;
            String delimiter = this.getOutgoingMessageDelimiter(context, flowFile);
            if (delimiter != null) {
                Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
                inputStreamEvent = new DelimitedInputStream(inputStream, delimiter.getBytes(charSet));
            }
            this.eventSender.sendEvent((Object)inputStreamEvent);
        });
    }

    private int sendRecords(ProcessContext context, ProcessSession session, FlowFile flowFile) {
        AtomicInteger recordCount = new AtomicInteger();
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        session.read(flowFile, inputStream -> {
            try (RecordReader recordReader = readerFactory.createRecordReader(flowFile, inputStream, this.getLogger());
                 ReusableByteArrayInputStream eventInputStream = new ReusableByteArrayInputStream();
                 ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                 RecordSetWriter recordSetWriter = writerFactory.createWriter(this.getLogger(), recordReader.getSchema(), (OutputStream)outputStream, flowFile);){
                Record record;
                while ((record = recordReader.nextRecord()) != null) {
                    recordSetWriter.write(record);
                    recordSetWriter.flush();
                    byte[] buffer = outputStream.toByteArray();
                    eventInputStream.setBuffer(buffer);
                    this.eventSender.sendEvent((Object)eventInputStream);
                    outputStream.reset();
                    recordCount.getAndIncrement();
                }
            }
            catch (SchemaNotFoundException | MalformedRecordException e) {
                throw new IOException("Record reading failed", e);
            }
        });
        return recordCount.get();
    }

    private static class ReusableByteArrayInputStream
    extends ByteArrayInputStream {
        private ReusableByteArrayInputStream() {
            super(new byte[0]);
        }

        private void setBuffer(byte[] buffer) {
            this.buf = buffer;
            this.pos = 0;
            this.count = buffer.length;
        }
    }
}

