/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.reporting;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonBuilderFactory;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonValue;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.reporting.AbstractSiteToSiteReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.s2s.SiteToSiteUtils;
import org.apache.nifi.reporting.util.provenance.ProvenanceEventConsumer;

@Tags(value={"provenance", "lineage", "tracking", "site", "site to site"})
@CapabilityDescription(value="Publishes Provenance events using the Site To Site protocol.")
@Stateful(scopes={Scope.LOCAL}, description="Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@Restricted(restrictions={@Restriction(requiredPermission=RequiredPermission.EXPORT_NIFI_DETAILS, explanation="Provides operator the ability to send sensitive details contained in Provenance events to any external system.")})
public class SiteToSiteProvenanceReportingTask
extends AbstractSiteToSiteReportingTask {
    static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream", "Start reading provenance Events from the beginning of the stream (the oldest event first)");
    static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream", "Start reading provenance Events from the end of the stream, ignoring old events");
    static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder().name("s2s-prov-task-event-filter").displayName("Event Type to Include").description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-event-filter-exclude").displayName("Event Type to Exclude").description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder().name("s2s-prov-task-type-filter").displayName("Component Type to Include").description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-type-filter-exclude").displayName("Component Type to Exclude").description("Regular expression to exclude the provenance events based on the component type. The events matching the regular expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder().name("s2s-prov-task-id-filter").displayName("Component ID to Include").description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-id-filter-exclude").displayName("Component ID to Exclude").description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_NAME = new PropertyDescriptor.Builder().name("s2s-prov-task-name-filter").displayName("Component Name to Include").description("Regular expression to filter the provenance events based on the component name. Only the events matching the regular expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor FILTER_COMPONENT_NAME_EXCLUDE = new PropertyDescriptor.Builder().name("s2s-prov-task-name-filter-exclude").displayName("Component Name to Exclude").description("Regular expression to exclude the provenance events based on the component name. The events matching the regular expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component name is included in Component Name to Include and excluded here, then the exclusion takes precedence and the event will not be sent.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder().name("start-position").displayName("Start Position").description("If the Reporting Task has never been run, or if its state has been reset by a user, specifies where in the stream of Provenance Events the Reporting Task should start").allowableValues(new AllowableValue[]{BEGINNING_OF_STREAM, END_OF_STREAM}).defaultValue(BEGINNING_OF_STREAM.getValue()).required(true).build();
    private volatile ProvenanceEventConsumer consumer;

    public SiteToSiteProvenanceReportingTask() throws IOException {
        InputStream schema = ((Object)((Object)this)).getClass().getClassLoader().getResourceAsStream("schema-provenance.avsc");
        this.recordSchema = AvroTypeUtil.createSchema((Schema)new Schema.Parser().parse(schema));
    }

    @OnScheduled
    public void onScheduled(ConfigurationContext context) throws IOException {
        String[] targetComponentIdsExclude;
        String[] targetComponentIds;
        String[] targetEventTypesExclude;
        this.consumer = new ProvenanceEventConsumer();
        this.consumer.setStartPositionValue(context.getProperty(START_POSITION).getValue());
        this.consumer.setBatchSize(context.getProperty(SiteToSiteUtils.BATCH_SIZE).asInteger().intValue());
        this.consumer.setLogger(this.getLogger());
        this.consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).evaluateAttributeExpressions().getValue());
        this.consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).evaluateAttributeExpressions().getValue());
        this.consumer.setComponentNameRegex(context.getProperty(FILTER_COMPONENT_NAME).evaluateAttributeExpressions().getValue());
        this.consumer.setComponentNameRegexExclude(context.getProperty(FILTER_COMPONENT_NAME_EXCLUDE).evaluateAttributeExpressions().getValue());
        String[] targetEventTypes = StringUtils.stripAll((String[])StringUtils.split((String)context.getProperty(FILTER_EVENT_TYPE).evaluateAttributeExpressions().getValue(), (char)','));
        if (targetEventTypes != null) {
            for (String type : targetEventTypes) {
                try {
                    this.consumer.addTargetEventType(new ProvenanceEventType[]{ProvenanceEventType.valueOf((String)type)});
                }
                catch (Exception e) {
                    this.getLogger().warn(type + " is not a correct event type, removed from the filtering.");
                }
            }
        }
        if ((targetEventTypesExclude = StringUtils.stripAll((String[])StringUtils.split((String)context.getProperty(FILTER_EVENT_TYPE_EXCLUDE).evaluateAttributeExpressions().getValue(), (char)','))) != null) {
            for (String type : targetEventTypesExclude) {
                try {
                    this.consumer.addTargetEventTypeExclude(new ProvenanceEventType[]{ProvenanceEventType.valueOf((String)type)});
                }
                catch (Exception e) {
                    this.getLogger().warn(type + " is not a correct event type, removed from the exclude filtering.");
                }
            }
        }
        if ((targetComponentIds = StringUtils.stripAll((String[])StringUtils.split((String)context.getProperty(FILTER_COMPONENT_ID).evaluateAttributeExpressions().getValue(), (char)','))) != null) {
            this.consumer.addTargetComponentId(targetComponentIds);
        }
        if ((targetComponentIdsExclude = StringUtils.stripAll((String[])StringUtils.split((String)context.getProperty(FILTER_COMPONENT_ID_EXCLUDE).evaluateAttributeExpressions().getValue(), (char)','))) != null) {
            this.consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
        }
        this.consumer.setScheduled(true);
    }

    @OnUnscheduled
    public void onUnscheduled() {
        if (this.consumer != null) {
            this.consumer.setScheduled(false);
        }
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        properties.add(SiteToSiteUtils.PLATFORM);
        properties.add(FILTER_EVENT_TYPE);
        properties.add(FILTER_EVENT_TYPE_EXCLUDE);
        properties.add(FILTER_COMPONENT_TYPE);
        properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
        properties.add(FILTER_COMPONENT_ID);
        properties.add(FILTER_COMPONENT_ID_EXCLUDE);
        properties.add(FILTER_COMPONENT_NAME);
        properties.add(FILTER_COMPONENT_NAME_EXCLUDE);
        properties.add(START_POSITION);
        return properties;
    }

    public void onTrigger(ReportingContext context) {
        URL url;
        boolean isClustered = context.isClustered();
        String nodeId = context.getClusterNodeIdentifier();
        if (nodeId == null && isClustered) {
            this.getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. Will wait for Node Identifier to be established.");
            return;
        }
        ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus();
        String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName();
        String nifiUrl = context.getProperty(SiteToSiteUtils.INSTANCE_URL).evaluateAttributeExpressions().getValue();
        try {
            url = URI.create(nifiUrl).toURL();
        }
        catch (IllegalArgumentException | MalformedURLException e) {
            throw new AssertionError();
        }
        String hostname = url.getHost();
        String platform = context.getProperty(SiteToSiteUtils.PLATFORM).evaluateAttributeExpressions().getValue();
        Boolean allowNullValues = context.getProperty(ALLOW_NULL_VALUES).asBoolean();
        Map config = Collections.emptyMap();
        JsonBuilderFactory factory = Json.createBuilderFactory(config);
        JsonObjectBuilder builder = factory.createObjectBuilder();
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
        df.setTimeZone(TimeZone.getTimeZone("Z"));
        this.consumer.consumeEvents(context, (mapHolder, events) -> {
            long start = System.nanoTime();
            JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
            for (ProvenanceEventRecord event : events) {
                String componentName = mapHolder.getComponentName(event.getComponentId());
                String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(), event.getComponentType());
                String processGroupName = mapHolder.getComponentName(processGroupId);
                arrayBuilder.add((JsonValue)this.serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId, allowNullValues));
            }
            JsonArray jsonArray = arrayBuilder.build();
            Transaction transaction = null;
            try {
                this.setup((PropertyContext)context);
                transaction = this.getClient().createTransaction(TransferDirection.SEND);
                if (transaction == null) {
                    throw new ProcessException("All destination nodes are penalized; will attempt to send data later");
                }
                HashMap<String, String> attributes = new HashMap<String, String>();
                String transactionId = UUID.randomUUID().toString();
                attributes.put("reporting.task.transaction.id", transactionId);
                attributes.put("reporting.task.name", this.getName());
                attributes.put("reporting.task.uuid", this.getIdentifier());
                attributes.put("reporting.task.type", ((Object)((Object)this)).getClass().getSimpleName());
                attributes.put("mime.type", "application/json");
                this.sendData(context, transaction, attributes, jsonArray);
                transaction.confirm();
                transaction.complete();
                long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
                this.getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", new Object[]{events.size(), transferMillis, transactionId, ((ProvenanceEventRecord)events.get(0)).getEventId()});
            }
            catch (Exception e) {
                if (transaction != null) {
                    transaction.error();
                }
                if (e instanceof ProcessException) {
                    throw (ProcessException)e;
                }
                throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), (Throwable)e);
            }
        });
    }

    private JsonObject serialize(JsonBuilderFactory factory, JsonObjectBuilder builder, ProvenanceEventRecord event, DateFormat df, String componentName, String processGroupId, String processGroupName, String hostname, URL nifiUrl, String applicationName, String platform, String nodeIdentifier, Boolean allowNullValues) {
        this.addField(builder, "eventId", UUID.randomUUID().toString(), (boolean)allowNullValues);
        this.addField(builder, "eventOrdinal", event.getEventId(), (boolean)allowNullValues);
        this.addField(builder, "eventType", event.getEventType().name(), (boolean)allowNullValues);
        this.addField(builder, "timestampMillis", event.getEventTime(), (boolean)allowNullValues);
        this.addField(builder, "timestamp", df.format(event.getEventTime()), (boolean)allowNullValues);
        this.addField(builder, "durationMillis", event.getEventDuration(), (boolean)allowNullValues);
        this.addField(builder, "lineageStart", event.getLineageStartDate(), (boolean)allowNullValues);
        this.addField(builder, "details", event.getDetails(), (boolean)allowNullValues);
        this.addField(builder, "componentId", event.getComponentId(), (boolean)allowNullValues);
        this.addField(builder, "componentType", event.getComponentType(), (boolean)allowNullValues);
        this.addField(builder, "componentName", componentName, (boolean)allowNullValues);
        this.addField(builder, "processGroupId", processGroupId, (boolean)allowNullValues);
        this.addField(builder, "processGroupName", processGroupName, (boolean)allowNullValues);
        this.addField(builder, "entityId", event.getFlowFileUuid(), (boolean)allowNullValues);
        this.addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile", (boolean)allowNullValues);
        this.addField(builder, "entitySize", event.getFileSize(), (boolean)allowNullValues);
        this.addField(builder, "previousEntitySize", event.getPreviousFileSize(), (boolean)allowNullValues);
        SiteToSiteProvenanceReportingTask.addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes(), allowNullValues);
        SiteToSiteProvenanceReportingTask.addField(builder, factory, "previousAttributes", event.getPreviousAttributes(), allowNullValues);
        this.addField(builder, "actorHostname", hostname, (boolean)allowNullValues);
        if (nifiUrl != null) {
            String urlString = nifiUrl.toString();
            String urlPrefix = urlString.substring(0, urlString.length() - "/nifi".length());
            String contentUriBase = urlPrefix + "/nifi-api/provenance-events/" + event.getEventId() + "/content/";
            Object nodeIdSuffix = nodeIdentifier == null ? "" : "?clusterNodeId=" + nodeIdentifier;
            this.addField(builder, "contentURI", contentUriBase + "output" + (String)nodeIdSuffix, (boolean)allowNullValues);
            this.addField(builder, "previousContentURI", contentUriBase + "input" + (String)nodeIdSuffix, (boolean)allowNullValues);
        }
        this.addField(builder, factory, "parentIds", event.getParentUuids(), allowNullValues);
        this.addField(builder, factory, "childIds", event.getChildUuids(), allowNullValues);
        this.addField(builder, "transitUri", event.getTransitUri(), (boolean)allowNullValues);
        this.addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier(), (boolean)allowNullValues);
        this.addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri(), (boolean)allowNullValues);
        this.addField(builder, "platform", platform, (boolean)allowNullValues);
        this.addField(builder, "application", applicationName, (boolean)allowNullValues);
        return builder.build();
    }

    private static void addField(JsonObjectBuilder builder, JsonBuilderFactory factory, String key, Map<String, String> values, Boolean allowNullValues) {
        if (values != null) {
            JsonObjectBuilder mapBuilder = factory.createObjectBuilder();
            for (Map.Entry<String, String> entry : values.entrySet()) {
                if (entry.getKey() == null) continue;
                if (entry.getValue() == null) {
                    if (!allowNullValues.booleanValue()) continue;
                    mapBuilder.add(entry.getKey(), JsonValue.NULL);
                    continue;
                }
                mapBuilder.add(entry.getKey(), entry.getValue());
            }
            builder.add(key, mapBuilder);
        } else if (allowNullValues.booleanValue()) {
            builder.add(key, JsonValue.NULL);
        }
    }

    private void addField(JsonObjectBuilder builder, JsonBuilderFactory factory, String key, Collection<String> values, Boolean allowNullValues) {
        if (values != null) {
            builder.add(key, SiteToSiteProvenanceReportingTask.createJsonArray(factory, values));
        } else if (allowNullValues.booleanValue()) {
            builder.add(key, JsonValue.NULL);
        }
    }

    private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, Collection<String> values) {
        JsonArrayBuilder builder = factory.createArrayBuilder();
        for (String value : values) {
            if (value == null) continue;
            builder.add(value);
        }
        return builder;
    }
}

