/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.riemann;

import com.aphyr.riemann.Proto;
import com.aphyr.riemann.client.RiemannClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@Tags(value={"riemann", "monitoring", "metrics"})
@DynamicProperty(name="Custom Event Attribute", expressionLanguageScope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, description="These values will be attached to the Riemann event as a custom attribute", value="Any value or expression")
@CapabilityDescription(value="Send events to Riemann (http://riemann.io) when FlowFiles pass through this processor. You can use events to notify Riemann that a FlowFile passed through, or you can attach a more meaningful metric, such as, the time a FlowFile took to get to this processor. All attributes attached to events support the NiFi Expression Language.")
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
public class PutRiemann
extends AbstractProcessor {
    protected volatile RiemannClient riemannClient = null;
    protected volatile Transport transport;
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Metrics successfully written to Riemann").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Metrics which failed to write to Riemann").build();
    public static final PropertyDescriptor RIEMANN_HOST = new PropertyDescriptor.Builder().name("Riemann Address").description("Hostname of Riemann server").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor RIEMANN_PORT = new PropertyDescriptor.Builder().name("Riemann Port").description("Port that Riemann is listening on").required(true).defaultValue("5555").addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor TRANSPORT_PROTOCOL = new PropertyDescriptor.Builder().name("Transport Protocol").description("Transport protocol to speak to Riemann in").required(true).allowableValues((Enum[])new Transport[]{Transport.TCP, Transport.UDP}).defaultValue("TCP").build();
    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder().name("Batch Size").description("Batch size for incoming FlowFiles").required(false).defaultValue("100").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor ATTR_SERVICE = new PropertyDescriptor.Builder().name("Service").description("Name of service associated to this event (e.g. FTP File Fetched)").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_STATE = new PropertyDescriptor.Builder().name("State").description("State of service associated to this event in string form (e.g. ok, warning, foo)").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_TIME = new PropertyDescriptor.Builder().name("Time").description("Time of event in unix epoch seconds (long), default: (current time)").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_HOST = new PropertyDescriptor.Builder().name("Host").description("A hostname associated to this event (e.g. nifi-app1)").required(false).defaultValue("${hostname()}").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_TTL = new PropertyDescriptor.Builder().name("TTL").description("Floating point value in seconds until Riemann considers this event as \"expired\"").required(false).addValidator(Validator.VALID).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor ATTR_METRIC = new PropertyDescriptor.Builder().name("Metric").description("Floating point number associated to this event").required(false).addValidator(Validator.VALID).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor ATTR_DESCRIPTION = new PropertyDescriptor.Builder().name("Description").description("Description associated to the event").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor ATTR_TAGS = new PropertyDescriptor.Builder().name("Tags").description("Comma separated list of tags associated to the event").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).build();
    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder().name("Timeout").description("Timeout in milliseconds when writing events to Riemann").required(true).defaultValue("1000").addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR).build();
    private volatile List<PropertyDescriptor> customAttributes = new ArrayList<PropertyDescriptor>();
    private static final Set<Relationship> RELATIONSHIPS = new HashSet<Relationship>();
    private static final List<PropertyDescriptor> LOCAL_PROPERTIES = new ArrayList<PropertyDescriptor>();
    private volatile int batchSize = -1;
    private volatile long writeTimeout = 1000L;

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return LOCAL_PROPERTIES;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(Validator.VALID).required(false).dynamic(true).build();
    }

    @OnStopped
    public final void cleanUpClient() {
        if (this.riemannClient != null) {
            this.riemannClient.close();
        }
        this.riemannClient = null;
        this.batchSize = -1;
        this.customAttributes.clear();
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws ProcessException {
        if (this.batchSize == -1) {
            this.batchSize = context.getProperty(BATCH_SIZE).asInteger();
        }
        if (this.riemannClient == null || !this.riemannClient.isConnected()) {
            this.transport = Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
            String host = context.getProperty(RIEMANN_HOST).getValue().trim();
            int port = context.getProperty(RIEMANN_PORT).asInteger();
            this.writeTimeout = context.getProperty(TIMEOUT).asLong();
            RiemannClient client = null;
            try {
                switch (this.transport) {
                    case TCP: {
                        client = RiemannClient.tcp((String)host, (int)port);
                        break;
                    }
                    case UDP: {
                        client = RiemannClient.udp((String)host, (int)port);
                    }
                }
                client.connect();
                this.riemannClient = client;
            }
            catch (IOException e) {
                if (client != null) {
                    client.close();
                }
                context.yield();
                throw new ProcessException(String.format("Unable to connect to Riemann [%s:%d] (%s)\n%s", new Object[]{host, port, this.transport, e.getMessage()}));
            }
        }
        if (this.customAttributes.size() == 0) {
            for (Map.Entry property : context.getProperties().entrySet()) {
                if (this.getSupportedPropertyDescriptors().contains(property.getKey())) continue;
                this.customAttributes.add((PropertyDescriptor)property.getKey());
            }
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        if (this.riemannClient == null || !this.riemannClient.isConnected()) {
            this.cleanUpClient();
            this.onScheduled(context);
        }
        List incomingFlowFiles = session.get(this.batchSize);
        ArrayList<FlowFile> successfulFlowFiles = new ArrayList<FlowFile>(incomingFlowFiles.size());
        ArrayList<Proto.Event> eventsQueue = new ArrayList<Proto.Event>(incomingFlowFiles.size());
        for (FlowFile flowFile : incomingFlowFiles) {
            try {
                eventsQueue.add(FlowFileToEvent.fromAttributes(context, this.customAttributes, flowFile));
                successfulFlowFiles.add(flowFile);
            }
            catch (NumberFormatException e) {
                this.getLogger().warn("Unable to create Riemann event.", (Throwable)e);
                session.transfer(flowFile, REL_FAILURE);
            }
        }
        try {
            if (this.transport == Transport.TCP) {
                Proto.Msg returnMessage = (Proto.Msg)this.riemannClient.sendEvents(eventsQueue).deref(this.writeTimeout, TimeUnit.MILLISECONDS);
                if (returnMessage == null) {
                    context.yield();
                    throw new ProcessException("Timed out writing to Riemann!");
                }
            } else {
                this.riemannClient.sendEvents(eventsQueue);
            }
            this.riemannClient.flush();
            session.transfer(successfulFlowFiles, REL_SUCCESS);
        }
        catch (Exception e) {
            context.yield();
            session.transfer((Collection)incomingFlowFiles);
            throw new ProcessException("Failed writing to Riemann\n" + e.getMessage());
        }
    }

    static {
        RELATIONSHIPS.add(REL_SUCCESS);
        RELATIONSHIPS.add(REL_FAILURE);
        LOCAL_PROPERTIES.add(RIEMANN_HOST);
        LOCAL_PROPERTIES.add(RIEMANN_PORT);
        LOCAL_PROPERTIES.add(TRANSPORT_PROTOCOL);
        LOCAL_PROPERTIES.add(TIMEOUT);
        LOCAL_PROPERTIES.add(BATCH_SIZE);
        LOCAL_PROPERTIES.add(ATTR_DESCRIPTION);
        LOCAL_PROPERTIES.add(ATTR_SERVICE);
        LOCAL_PROPERTIES.add(ATTR_STATE);
        LOCAL_PROPERTIES.add(ATTR_METRIC);
        LOCAL_PROPERTIES.add(ATTR_TTL);
        LOCAL_PROPERTIES.add(ATTR_TAGS);
        LOCAL_PROPERTIES.add(ATTR_HOST);
        LOCAL_PROPERTIES.add(ATTR_TIME);
    }

    private static class FlowFileToEvent {
        private FlowFileToEvent() {
        }

        protected static Proto.Event fromAttributes(ProcessContext context, List<PropertyDescriptor> customProperties, FlowFile flowFile) {
            PropertyValue tags;
            PropertyValue host;
            PropertyValue ttl;
            PropertyValue state;
            PropertyValue time;
            PropertyValue metric;
            PropertyValue description;
            Proto.Event.Builder builder = Proto.Event.newBuilder();
            PropertyValue service = context.getProperty(ATTR_SERVICE).evaluateAttributeExpressions(flowFile);
            if (StringUtils.isNotBlank((CharSequence)service.getValue())) {
                builder.setService(service.getValue());
            }
            if (StringUtils.isNotBlank((CharSequence)(description = context.getProperty(ATTR_DESCRIPTION).evaluateAttributeExpressions(flowFile)).getValue())) {
                builder.setDescription(description.getValue());
            }
            if (StringUtils.isNotBlank((CharSequence)(metric = context.getProperty(ATTR_METRIC).evaluateAttributeExpressions(flowFile)).getValue())) {
                builder.setMetricF(metric.asFloat().floatValue());
            }
            if (StringUtils.isNotBlank((CharSequence)(time = context.getProperty(ATTR_TIME).evaluateAttributeExpressions(flowFile)).getValue())) {
                builder.setTime(time.asLong().longValue());
            }
            if (StringUtils.isNotBlank((CharSequence)(state = context.getProperty(ATTR_STATE).evaluateAttributeExpressions(flowFile)).getValue())) {
                builder.setState(state.getValue());
            }
            if (StringUtils.isNotBlank((CharSequence)(ttl = context.getProperty(ATTR_TTL).evaluateAttributeExpressions(flowFile)).getValue())) {
                builder.setTtl(ttl.asFloat().floatValue());
            }
            if (StringUtils.isNotBlank((CharSequence)(host = context.getProperty(ATTR_HOST).evaluateAttributeExpressions(flowFile)).getValue())) {
                builder.setHost(host.getValue());
            }
            if (StringUtils.isNotBlank((CharSequence)(tags = context.getProperty(ATTR_TAGS).evaluateAttributeExpressions(flowFile)).getValue())) {
                String[] splitTags;
                for (String splitTag : splitTags = tags.getValue().split(",")) {
                    builder.addTags(splitTag.trim());
                }
            }
            for (PropertyDescriptor customProperty : customProperties) {
                PropertyValue customAttributeValue = context.getProperty(customProperty).evaluateAttributeExpressions(flowFile);
                if (!StringUtils.isNotBlank((CharSequence)customAttributeValue.getValue())) continue;
                builder.addAttributes(Proto.Attribute.newBuilder().setKey(customProperty.getName()).setValue(customAttributeValue.getValue()).build());
            }
            return builder.build();
        }
    }

    protected static enum Transport {
        TCP,
        UDP;

    }
}

