/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.twitter;

import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.endpoint.Location;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.event.Event;
import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.twitter.ConsumeTwitter;
import org.apache.nifi.processors.twitter.LocationUtil;

@Deprecated
@DeprecationNotice(alternatives={ConsumeTwitter.class}, reason="GetTwitter relies on the Twitter Hosebird client, which is not maintained. This processor will be removed in future releases.")
@SupportsBatching
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"twitter", "tweets", "social media", "status", "json"})
@CapabilityDescription(value="Pulls status changes from Twitter's streaming API. In versions starting with 1.9.0, the Consumer Key and Access Token are marked as sensitive according to https://developer.twitter.com/en/docs/basics/authentication/guides/securing-keys-and-tokens")
@WritesAttribute(attribute="mime.type", description="Sets mime type to application/json")
@DynamicProperty(name="The name of a query parameter to add to the Twitter query", value="The value of a query parameter to add to the Twitter query", description="Allows users to specify the name/value of a query parameter to add to the Twitter query")
public class GetTwitter
extends AbstractProcessor {
    static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'");
    static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets");
    static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs");
    public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder().name("Twitter Endpoint").description("Specifies which endpoint data should be pulled from").required(true).allowableValues(new AllowableValue[]{ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER}).defaultValue(ENDPOINT_SAMPLE.getValue()).build();
    public static final PropertyDescriptor MAX_CLIENT_ERROR_RETRIES = new PropertyDescriptor.Builder().name("max-client-error-retries").displayName("Max Client Error Retries").description("The maximum number of retries to attempt when client experience retryable connection errors. Client continues attempting to reconnect using an exponential back-off pattern until it successfully reconnects or until it reaches the retry limit.  It is recommended to raise this value when client is getting rate limited by Twitter API. Default value is 5.").required(true).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("5").build();
    public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder().name("Consumer Key").description("The Consumer Key provided by Twitter").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder().name("Consumer Secret").description("The Consumer Secret provided by Twitter").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder().name("Access Token").description("The Access Token provided by Twitter").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder().name("Access Token Secret").description("The Access Token Secret provided by Twitter").required(true).sensitive(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor LANGUAGES = new PropertyDescriptor.Builder().name("Languages").description("A comma-separated list of languages for which tweets should be fetched").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor FOLLOWING = new PropertyDescriptor.Builder().name("IDs to Follow").description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.").required(false).addValidator((Validator)new FollowingValidator()).build();
    public static final PropertyDescriptor LOCATIONS = new PropertyDescriptor.Builder().name("Locations to Filter On").description("A comma-separated list of coordinates specifying one or more bounding boxes to filter on.Each bounding box is specified by a pair of coordinates in the format: swLon,swLat,neLon,neLat. Multiple bounding boxes can be specified as such: swLon1,swLat1,neLon1,neLat1,swLon2,swLat2,neLon2,neLat2.Ignored unless Endpoint is set to 'Filter Endpoint'.").addValidator((Validator)new LocationValidator()).required(false).build();
    public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder().name("Terms to Filter On").description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'. The filter works such that if any term matches, the status update will be retrieved; multiple terms separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that have either 'hello' or both 'it' AND 'was'").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All status updates will be routed to this relationship").build();
    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(1000);
    private volatile ClientBuilder clientBuilder;
    private volatile Client client;
    private volatile BlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>(5000);

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(ENDPOINT);
        descriptors.add(MAX_CLIENT_ERROR_RETRIES);
        descriptors.add(CONSUMER_KEY);
        descriptors.add(CONSUMER_SECRET);
        descriptors.add(ACCESS_TOKEN);
        descriptors.add(ACCESS_TOKEN_SECRET);
        descriptors.add(LANGUAGES);
        descriptors.add(TERMS);
        descriptors.add(FOLLOWING);
        descriptors.add(LOCATIONS);
        this.descriptors = Collections.unmodifiableList(descriptors);
        HashSet<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
        return new PropertyDescriptor.Builder().name(propertyDescriptorName).description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query").required(false).dynamic(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        String endpointName = validationContext.getProperty(ENDPOINT).getValue();
        if (ENDPOINT_FILTER.getValue().equals(endpointName) && !validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() && !validationContext.getProperty(LOCATIONS).isSet()) {
            results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()).valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "'' or '" + LOCATIONS.getName() + " must be set").build());
        }
        return results;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        this.messageQueue.clear();
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) {
        StatusesSampleEndpoint streamingEndpoint;
        String host;
        ArrayList<String> languages;
        String endpointName = context.getProperty(ENDPOINT).getValue();
        int maxRetries = context.getProperty(MAX_CLIENT_ERROR_RETRIES).asInteger();
        OAuth1 oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), context.getProperty(CONSUMER_SECRET).getValue(), context.getProperty(ACCESS_TOKEN).getValue(), context.getProperty(ACCESS_TOKEN_SECRET).getValue());
        ClientBuilder clientBuilder = new ClientBuilder();
        clientBuilder.name("GetTwitter[id=" + this.getIdentifier() + "]").authentication((Authentication)oauth).eventMessageQueue(this.eventQueue).processor((HosebirdMessageProcessor)new StringDelimitedProcessor(this.messageQueue));
        String languageString = context.getProperty(LANGUAGES).getValue();
        if (languageString == null) {
            languages = null;
        } else {
            languages = new ArrayList<String>();
            for (String language : context.getProperty(LANGUAGES).getValue().split(",")) {
                languages.add(language.trim());
            }
        }
        if (ENDPOINT_SAMPLE.getValue().equals(endpointName)) {
            StatusesSampleEndpoint sse;
            host = "https://stream.twitter.com";
            streamingEndpoint = sse = new StatusesSampleEndpoint();
            if (languages != null) {
                sse.languages(languages);
            }
        } else if (ENDPOINT_FIREHOSE.getValue().equals(endpointName)) {
            host = "https://stream.twitter.com";
            StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint();
            streamingEndpoint = firehoseEndpoint;
            if (languages != null) {
                firehoseEndpoint.languages(languages);
            }
        } else if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
            String locationString;
            List<Object> locations;
            List terms;
            List followingIds;
            host = "https://stream.twitter.com";
            StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint();
            String followingString = context.getProperty(FOLLOWING).getValue();
            if (followingString == null) {
                followingIds = Collections.emptyList();
            } else {
                followingIds = new ArrayList();
                for (String split : followingString.split(",")) {
                    Long id = Long.parseLong(split.trim());
                    followingIds.add(id);
                }
            }
            String termString = context.getProperty(TERMS).getValue();
            if (termString == null) {
                terms = Collections.emptyList();
            } else {
                terms = new ArrayList();
                for (String split : termString.split(",")) {
                    terms.add(split.trim());
                }
            }
            if (!terms.isEmpty()) {
                filterEndpoint.trackTerms(terms);
            }
            if (!followingIds.isEmpty()) {
                filterEndpoint.followings(followingIds);
            }
            if (languages != null) {
                filterEndpoint.languages(languages);
            }
            if (!(locations = (locationString = context.getProperty(LOCATIONS).getValue()) == null ? Collections.emptyList() : LocationUtil.parseLocations(locationString)).isEmpty()) {
                filterEndpoint.locations(locations);
            }
            streamingEndpoint = filterEndpoint;
        } else {
            throw new AssertionError((Object)("Endpoint was invalid value: " + endpointName));
        }
        clientBuilder.hosts(host).endpoint((StreamingEndpoint)streamingEndpoint);
        clientBuilder.retries(maxRetries);
        this.clientBuilder = clientBuilder;
    }

    public synchronized void connectNewClient() {
        if (this.client == null || this.client.isDone()) {
            this.client = this.clientBuilder.build();
            try {
                this.client.connect();
            }
            catch (Exception e) {
                this.client.stop();
            }
        }
    }

    @OnStopped
    public void shutdownClient() {
        if (this.client != null) {
            this.client.stop();
        }
    }

    @OnPrimaryNodeStateChange
    public void onPrimaryNodeChange(PrimaryNodeState newState) {
        if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
            this.shutdownClient();
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String tweet;
        Event event;
        if (this.client == null || this.client.isDone()) {
            this.connectNewClient();
            if (this.client.isDone()) {
                context.yield();
                return;
            }
        }
        if ((event = (Event)this.eventQueue.poll()) != null) {
            switch (event.getEventType()) {
                case STOPPED_BY_ERROR: {
                    this.getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[]{event.getEventType(), event.getMessage(), event.getUnderlyingException()});
                    break;
                }
                case CONNECTION_ERROR: 
                case HTTP_ERROR: {
                    this.getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[]{event.getEventType(), event.getMessage()});
                    this.client.reconnect();
                    break;
                }
            }
        }
        if ((tweet = (String)this.messageQueue.poll()) == null) {
            context.yield();
            return;
        }
        FlowFile flowFile = session.create();
        flowFile = session.write(flowFile, new OutputStreamCallback(){

            public void process(OutputStream out) throws IOException {
                out.write(tweet.getBytes(StandardCharsets.UTF_8));
            }
        });
        HashMap<String, Object> attributes = new HashMap<String, Object>();
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
        flowFile = session.putAllAttributes(flowFile, attributes);
        session.transfer(flowFile, REL_SUCCESS);
        session.getProvenanceReporter().receive(flowFile, "https://stream.twitter.com" + this.client.getEndpoint().getURI());
    }

    private static class FollowingValidator
    implements Validator {
        private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");

        private FollowingValidator() {
        }

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            String[] splits;
            for (String split : splits = input.split(",")) {
                if (NUMBER_PATTERN.matcher(split.trim()).matches()) continue;
                return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build();
            }
            return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
        }
    }

    private static class LocationValidator
    implements Validator {
        private LocationValidator() {
        }

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            try {
                List<Location> locations = LocationUtil.parseLocations(input);
                for (Location location : locations) {
                    Location.Coordinate sw = location.southwestCoordinate();
                    Location.Coordinate ne = location.northeastCoordinate();
                    if (sw.longitude() > ne.longitude()) {
                        return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Longitude (" + sw.longitude() + ") must be less than NE Longitude (" + ne.longitude() + ").").build();
                    }
                    if (sw.longitude() == ne.longitude()) {
                        return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Longitude (" + sw.longitude() + ") can not be equal to NE Longitude (" + ne.longitude() + ").").build();
                    }
                    if (sw.latitude() > ne.latitude()) {
                        return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Latitude (" + sw.latitude() + ") must be less than NE Latitude (" + ne.latitude() + ").").build();
                    }
                    if (sw.latitude() != ne.latitude()) continue;
                    return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Latitude (" + sw.latitude() + ") can not be equal to NE Latitude (" + ne.latitude() + ").").build();
                }
                return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
            }
            catch (IllegalStateException e) {
                return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be a comma-separated list of longitude,latitude pairs specifying one or more bounding boxes.").build();
            }
        }
    }
}

