/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.twitter;

import com.twitter.clientlib.ApiClient;
import com.twitter.clientlib.ApiException;
import com.twitter.clientlib.TwitterCredentialsBearer;
import com.twitter.clientlib.api.TwitterApi;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.twitter.ConsumeTwitter;
import org.apache.nifi.processors.twitter.StreamEndpoint;

public class TweetStreamService {
    private final BlockingQueue<String> queue;
    private final ComponentLog logger;
    private ScheduledExecutorService executorService;
    private final ThreadFactory threadFactory;
    private final Set<String> tweetFields;
    private final Set<String> userFields;
    private final Set<String> mediaFields;
    private final Set<String> pollFields;
    private final Set<String> placeFields;
    private final Set<String> expansions;
    private final int backfillMinutes;
    private final TwitterApi api;
    private InputStream stream;
    private final int backoffAttempts;
    private final long backoffTime;
    private final long maximumBackoff;
    private long backoffMultiplier;
    private int attemptCounter;
    private final StreamEndpoint endpoint;

    public TweetStreamService(ProcessContext context, BlockingQueue<String> queue, ComponentLog logger) {
        Objects.requireNonNull(context);
        Objects.requireNonNull(queue);
        Objects.requireNonNull(logger);
        this.queue = queue;
        this.logger = logger;
        String endpointName = context.getProperty(ConsumeTwitter.ENDPOINT).getValue();
        this.endpoint = ConsumeTwitter.ENDPOINT_SAMPLE.getValue().equals(endpointName) ? StreamEndpoint.SAMPLE_ENDPOINT : StreamEndpoint.SEARCH_ENDPOINT;
        this.tweetFields = this.parseCommaSeparatedProperties(context, ConsumeTwitter.TWEET_FIELDS);
        this.userFields = this.parseCommaSeparatedProperties(context, ConsumeTwitter.USER_FIELDS);
        this.mediaFields = this.parseCommaSeparatedProperties(context, ConsumeTwitter.MEDIA_FIELDS);
        this.pollFields = this.parseCommaSeparatedProperties(context, ConsumeTwitter.POLL_FIELDS);
        this.placeFields = this.parseCommaSeparatedProperties(context, ConsumeTwitter.PLACE_FIELDS);
        this.expansions = this.parseCommaSeparatedProperties(context, ConsumeTwitter.EXPANSIONS);
        this.backfillMinutes = context.getProperty(ConsumeTwitter.BACKFILL_MINUTES).asInteger();
        this.backoffMultiplier = 1L;
        this.backoffAttempts = context.getProperty(ConsumeTwitter.BACKOFF_ATTEMPTS).asInteger();
        this.attemptCounter = 0;
        this.backoffTime = context.getProperty(ConsumeTwitter.BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS);
        this.maximumBackoff = context.getProperty(ConsumeTwitter.MAXIMUM_BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS);
        ApiClient client = new ApiClient();
        int connectTimeout = context.getProperty(ConsumeTwitter.CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        int readTimeout = context.getProperty(ConsumeTwitter.READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        TwitterCredentialsBearer bearer = new TwitterCredentialsBearer(context.getProperty(ConsumeTwitter.BEARER_TOKEN).getValue());
        client.setConnectTimeout(connectTimeout);
        client.setReadTimeout(readTimeout);
        client.setTwitterCredentials(bearer);
        this.api = new TwitterApi(client);
        String basePath = context.getProperty(ConsumeTwitter.BASE_PATH).getValue();
        this.api.getApiClient().setBasePath(basePath);
        this.threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
    }

    public String getTransitUri(String endpoint) {
        if (endpoint.equals(StreamEndpoint.SAMPLE_ENDPOINT.getEndpointName())) {
            return this.api.getApiClient().getBasePath() + StreamEndpoint.SAMPLE_ENDPOINT.getPath();
        }
        if (endpoint.equals(StreamEndpoint.SEARCH_ENDPOINT.getEndpointName())) {
            return this.api.getApiClient().getBasePath() + StreamEndpoint.SEARCH_ENDPOINT.getPath();
        }
        this.logger.warn("Unrecognized endpoint in getTransitUri. Returning basePath");
        return this.api.getApiClient().getBasePath();
    }

    public void start() {
        this.executorService = Executors.newSingleThreadScheduledExecutor(this.threadFactory);
        this.executorService.execute(new TweetStreamStarter());
    }

    public void stop() {
        if (this.stream != null) {
            try {
                this.stream.close();
            }
            catch (IOException e) {
                this.logger.error("Closing response stream failed", (Throwable)e);
            }
        }
        this.executorService.shutdownNow();
        this.executorService = null;
    }

    private Long calculateBackoffDelay() {
        long backoff = this.backoffMultiplier * this.backoffTime;
        return Math.min(backoff, this.maximumBackoff);
    }

    private void scheduleStartStreamWithBackoff() {
        if (this.attemptCounter >= this.backoffAttempts) {
            throw new ProcessException(String.format("Connection failed after maximum attempts [%d]", this.attemptCounter));
        }
        ++this.attemptCounter;
        long delay = this.calculateBackoffDelay();
        this.backoffMultiplier *= 2L;
        this.logger.info("Scheduling new stream connection after delay [{} s]", new Object[]{delay});
        this.executorService.schedule(new TweetStreamStarter(), delay, TimeUnit.SECONDS);
    }

    private void resetBackoff() {
        this.attemptCounter = 0;
        this.backoffMultiplier = 1L;
    }

    private Set<String> parseCommaSeparatedProperties(ProcessContext context, PropertyDescriptor property) {
        HashSet<String> fields = null;
        if (context.getProperty(property).isSet()) {
            fields = new HashSet<String>();
            String fieldsString = context.getProperty(property).getValue();
            for (String field : fieldsString.split(",")) {
                fields.add(field.trim());
            }
        }
        return fields;
    }

    private class TweetStreamHandler
    implements Runnable {
        private TweetStreamHandler() {
        }

        @Override
        public void run() {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(TweetStreamService.this.stream, StandardCharsets.UTF_8));){
                String tweetRecord = reader.readLine();
                while (tweetRecord != null) {
                    if (tweetRecord.isEmpty()) {
                        tweetRecord = reader.readLine();
                        continue;
                    }
                    TweetStreamService.this.queue.put(tweetRecord);
                    TweetStreamService.this.resetBackoff();
                    tweetRecord = reader.readLine();
                }
            }
            catch (IOException e) {
                TweetStreamService.this.logger.info("Stream is closed or has stopped", (Throwable)e);
            }
            catch (InterruptedException e) {
                TweetStreamService.this.logger.info("Interrupted while adding Tweet to queue", (Throwable)e);
                return;
            }
            TweetStreamService.this.logger.info("Stream processing completed");
            TweetStreamService.this.scheduleStartStreamWithBackoff();
        }
    }

    private class TweetStreamStarter
    implements Runnable {
        private TweetStreamStarter() {
        }

        @Override
        public void run() {
            try {
                TweetStreamService.this.stream = TweetStreamService.this.endpoint.equals((Object)StreamEndpoint.SAMPLE_ENDPOINT) ? TweetStreamService.this.api.tweets().sampleStream().expansions(TweetStreamService.this.expansions).tweetFields(TweetStreamService.this.tweetFields).userFields(TweetStreamService.this.userFields).mediaFields(TweetStreamService.this.mediaFields).placeFields(TweetStreamService.this.placeFields).pollFields(TweetStreamService.this.pollFields).backfillMinutes(Integer.valueOf(TweetStreamService.this.backfillMinutes)).execute() : TweetStreamService.this.api.tweets().searchStream().expansions(TweetStreamService.this.expansions).tweetFields(TweetStreamService.this.tweetFields).userFields(TweetStreamService.this.userFields).mediaFields(TweetStreamService.this.mediaFields).placeFields(TweetStreamService.this.placeFields).pollFields(TweetStreamService.this.pollFields).backfillMinutes(Integer.valueOf(TweetStreamService.this.backfillMinutes)).execute();
                TweetStreamService.this.executorService.execute(new TweetStreamHandler());
            }
            catch (ApiException e) {
                TweetStreamService.this.stream = null;
                TweetStreamService.this.logger.warn("Twitter Stream [{}] API connection failed: HTTP {}", new Object[]{TweetStreamService.this.endpoint.getEndpointName(), e.getCode(), e});
                TweetStreamService.this.scheduleStartStreamWithBackoff();
            }
            catch (Exception e) {
                TweetStreamService.this.stream = null;
                TweetStreamService.this.logger.warn("Twitter Stream [{}] connection failed", new Object[]{TweetStreamService.this.endpoint.getEndpointName(), e});
                TweetStreamService.this.scheduleStartStreamWithBackoff();
            }
        }
    }
}

