package org.apache.nifi.record.sink.event;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.event.transport.EventSender;
import org.apache.nifi.event.transport.configuration.TransportProtocol;
import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSet;

@CapabilityDescription("Format and send Records as UDP Datagram Packets to a configurable destination")
@Tags({"UDP", "event", "record", "sink"})
/* loaded from: input_file:org/apache/nifi/record/sink/event/UDPEventRecordSink.class */
public class UDPEventRecordSink extends AbstractControllerService implements RecordSinkService {
    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder().name("hostname").displayName("Hostname").description("Destination hostname or IP address").required(true).addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("port").displayName("Port").description("Destination port number").required(true).addValidator(StandardValidators.PORT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor SENDER_THREADS = new PropertyDescriptor.Builder().name("sender-threads").displayName("Sender Threads").description("Number of worker threads allocated for handling socket communication").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).defaultValue("2").build();
    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(HOSTNAME, PORT, RECORD_WRITER_FACTORY, SENDER_THREADS));
    private static final String TRANSIT_URI_ATTRIBUTE_KEY = "record.sink.url";
    private static final String TRANSIT_URI_FORMAT = "udp://%s:%d";
    private RecordSetWriterFactory writerFactory;
    private EventSender<byte[]> eventSender;
    private String transitUri;

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return PROPERTY_DESCRIPTORS;
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext configurationContext) {
        this.writerFactory = configurationContext.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
        this.eventSender = getEventSender(configurationContext);
    }

    @OnDisabled
    public void onDisabled() throws Exception {
        if (this.eventSender == null) {
            getLogger().debug("Event Sender not configured");
        } else {
            this.eventSender.close();
        }
    }

    public WriteResult sendData(RecordSet recordSet, Map<String, String> map, boolean z) throws IOException {
        LinkedHashMap linkedHashMap = new LinkedHashMap(map);
        linkedHashMap.put(TRANSIT_URI_ATTRIBUTE_KEY, this.transitUri);
        int i = 0;
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                RecordSetWriter createWriter = this.writerFactory.createWriter(getLogger(), recordSet.getSchema(), byteArrayOutputStream, linkedHashMap);
                while (true) {
                    try {
                        Record next = recordSet.next();
                        if (next == null) {
                            break;
                        }
                        WriteResult write = createWriter.write(next);
                        createWriter.flush();
                        sendRecord(byteArrayOutputStream);
                        i += write.getRecordCount();
                    } catch (Throwable th) {
                        if (createWriter != null) {
                            try {
                                createWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                if (createWriter != null) {
                    createWriter.close();
                }
                byteArrayOutputStream.close();
                return WriteResult.of(i, linkedHashMap);
            } finally {
            }
        } catch (SchemaNotFoundException e) {
            throw new IOException("Record Schema not found", e);
        } catch (IOException | RuntimeException e2) {
            throw new IOException(String.format("Record [%d] Destination [%s] Transmission failed", 0, this.transitUri), e2);
        }
    }

    private void sendRecord(ByteArrayOutputStream byteArrayOutputStream) {
        this.eventSender.sendEvent(byteArrayOutputStream.toByteArray());
        byteArrayOutputStream.reset();
    }

    private EventSender<byte[]> getEventSender(ConfigurationContext configurationContext) {
        String value = configurationContext.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue();
        int intValue = configurationContext.getProperty(PORT).evaluateAttributeExpressions().asInteger().intValue();
        this.transitUri = String.format(TRANSIT_URI_FORMAT, value, Integer.valueOf(intValue));
        ByteArrayNettyEventSenderFactory byteArrayNettyEventSenderFactory = new ByteArrayNettyEventSenderFactory(getLogger(), value, intValue, TransportProtocol.UDP);
        byteArrayNettyEventSenderFactory.setShutdownQuietPeriod(Duration.ZERO);
        byteArrayNettyEventSenderFactory.setShutdownTimeout(Duration.ZERO);
        byteArrayNettyEventSenderFactory.setThreadNamePrefix(String.format("%s[%s]", getClass().getSimpleName(), getIdentifier()));
        byteArrayNettyEventSenderFactory.setWorkerThreads(configurationContext.getProperty(SENDER_THREADS).evaluateAttributeExpressions().asInteger().intValue());
        return byteArrayNettyEventSenderFactory.getEventSender();
    }
}
