package org.apache.nifi.processors.twitter;

import com.twitter.clientlib.ApiClient;
import com.twitter.clientlib.ApiException;
import com.twitter.clientlib.TwitterCredentialsBearer;
import com.twitter.clientlib.api.TwitterApi;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;

/* loaded from: input_file:org/apache/nifi/processors/twitter/TweetStreamService.class */
public class TweetStreamService {
    private final BlockingQueue<String> queue;
    private final ComponentLog logger;
    private final ScheduledExecutorService executorService;
    private final Set<String> tweetFields;
    private final Set<String> userFields;
    private final Set<String> mediaFields;
    private final Set<String> pollFields;
    private final Set<String> placeFields;
    private final Set<String> expansions;
    private final int backfillMinutes;
    private final TwitterApi api;
    private InputStream stream;
    private final int backoffAttempts;
    private final long backoffTime;
    private final long maximumBackoff;
    private long backoffMultiplier;
    private int attemptCounter;
    private final StreamEndpoint endpoint;

    /* loaded from: input_file:org/apache/nifi/processors/twitter/TweetStreamService$TweetStreamHandler.class */
    private class TweetStreamHandler implements Runnable {
        private TweetStreamHandler() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(TweetStreamService.this.stream, StandardCharsets.UTF_8));
                try {
                    String readLine = bufferedReader.readLine();
                    while (readLine != null) {
                        if (readLine.isEmpty()) {
                            readLine = bufferedReader.readLine();
                        } else {
                            TweetStreamService.this.queue.put(readLine);
                            TweetStreamService.this.resetBackoff();
                            readLine = bufferedReader.readLine();
                        }
                    }
                    bufferedReader.close();
                } finally {
                }
            } catch (IOException e) {
                TweetStreamService.this.logger.info("Stream is closed or has stopped", e);
            } catch (InterruptedException e2) {
                TweetStreamService.this.logger.info("Interrupted while adding Tweet to queue", e2);
                return;
            }
            TweetStreamService.this.logger.info("Stream processing completed");
            TweetStreamService.this.scheduleStartStreamWithBackoff();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/twitter/TweetStreamService$TweetStreamStarter.class */
    public class TweetStreamStarter implements Runnable {
        private TweetStreamStarter() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (TweetStreamService.this.endpoint.equals(StreamEndpoint.SAMPLE_ENDPOINT)) {
                    TweetStreamService.this.stream = TweetStreamService.this.api.tweets().sampleStream(TweetStreamService.this.expansions, TweetStreamService.this.tweetFields, TweetStreamService.this.userFields, TweetStreamService.this.mediaFields, TweetStreamService.this.placeFields, TweetStreamService.this.pollFields, Integer.valueOf(TweetStreamService.this.backfillMinutes));
                } else {
                    TweetStreamService.this.stream = TweetStreamService.this.api.tweets().searchStream(TweetStreamService.this.expansions, TweetStreamService.this.tweetFields, TweetStreamService.this.userFields, TweetStreamService.this.mediaFields, TweetStreamService.this.placeFields, TweetStreamService.this.pollFields, Integer.valueOf(TweetStreamService.this.backfillMinutes));
                }
                TweetStreamService.this.executorService.execute(new TweetStreamHandler());
            } catch (Exception e) {
                TweetStreamService.this.stream = null;
                TweetStreamService.this.logger.warn("Twitter Stream [{}] connection failed", new Object[]{TweetStreamService.this.endpoint.getEndpointName(), e});
                TweetStreamService.this.scheduleStartStreamWithBackoff();
            } catch (ApiException e2) {
                TweetStreamService.this.stream = null;
                TweetStreamService.this.logger.warn("Twitter Stream [{}] API connection failed: HTTP {}", new Object[]{TweetStreamService.this.endpoint.getEndpointName(), Integer.valueOf(e2.getCode()), e2});
                TweetStreamService.this.scheduleStartStreamWithBackoff();
            }
        }
    }

    public TweetStreamService(ProcessContext processContext, BlockingQueue<String> blockingQueue, ComponentLog componentLog) {
        Objects.requireNonNull(processContext);
        Objects.requireNonNull(blockingQueue);
        Objects.requireNonNull(componentLog);
        this.queue = blockingQueue;
        this.logger = componentLog;
        if (ConsumeTwitter.ENDPOINT_SAMPLE.getValue().equals(processContext.getProperty(ConsumeTwitter.ENDPOINT).getValue())) {
            this.endpoint = StreamEndpoint.SAMPLE_ENDPOINT;
        } else {
            this.endpoint = StreamEndpoint.SEARCH_ENDPOINT;
        }
        this.tweetFields = parseCommaSeparatedProperties(processContext, ConsumeTwitter.TWEET_FIELDS);
        this.userFields = parseCommaSeparatedProperties(processContext, ConsumeTwitter.USER_FIELDS);
        this.mediaFields = parseCommaSeparatedProperties(processContext, ConsumeTwitter.MEDIA_FIELDS);
        this.pollFields = parseCommaSeparatedProperties(processContext, ConsumeTwitter.POLL_FIELDS);
        this.placeFields = parseCommaSeparatedProperties(processContext, ConsumeTwitter.PLACE_FIELDS);
        this.expansions = parseCommaSeparatedProperties(processContext, ConsumeTwitter.EXPANSIONS);
        this.backfillMinutes = processContext.getProperty(ConsumeTwitter.BACKFILL_MINUTES).asInteger().intValue();
        this.backoffMultiplier = 1L;
        this.backoffAttempts = processContext.getProperty(ConsumeTwitter.BACKOFF_ATTEMPTS).asInteger().intValue();
        this.attemptCounter = 0;
        this.backoffTime = processContext.getProperty(ConsumeTwitter.BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS).longValue();
        this.maximumBackoff = processContext.getProperty(ConsumeTwitter.MAXIMUM_BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS).longValue();
        ApiClient apiClient = new ApiClient();
        int intValue = processContext.getProperty(ConsumeTwitter.CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        this.api = new TwitterApi(apiClient.setConnectTimeout(intValue).setReadTimeout(processContext.getProperty(ConsumeTwitter.READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()));
        this.api.setTwitterCredentials(new TwitterCredentialsBearer(processContext.getProperty(ConsumeTwitter.BEARER_TOKEN).getValue()));
        this.api.getApiClient().setBasePath(processContext.getProperty(ConsumeTwitter.BASE_PATH).getValue());
        this.executorService = Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build());
    }

    public String getTransitUri(String str) {
        if (str.equals(StreamEndpoint.SAMPLE_ENDPOINT.getEndpointName())) {
            return this.api.getApiClient().getBasePath() + StreamEndpoint.SAMPLE_ENDPOINT.getPath();
        }
        if (str.equals(StreamEndpoint.SEARCH_ENDPOINT.getEndpointName())) {
            return this.api.getApiClient().getBasePath() + StreamEndpoint.SEARCH_ENDPOINT.getPath();
        }
        this.logger.warn("Unrecognized endpoint in getTransitUri. Returning basePath");
        return this.api.getApiClient().getBasePath();
    }

    public void start() {
        this.executorService.execute(new TweetStreamStarter());
    }

    public void stop() {
        if (this.stream != null) {
            try {
                this.stream.close();
            } catch (IOException e) {
                this.logger.error("Closing response stream failed", e);
            }
        }
        this.executorService.shutdownNow();
    }

    private Long calculateBackoffDelay() {
        return Long.valueOf(Math.min(this.backoffMultiplier * this.backoffTime, this.maximumBackoff));
    }

    private void scheduleStartStreamWithBackoff() {
        if (this.attemptCounter >= this.backoffAttempts) {
            throw new ProcessException(String.format("Connection failed after maximum attempts [%d]", Integer.valueOf(this.attemptCounter)));
        }
        this.attemptCounter++;
        long longValue = calculateBackoffDelay().longValue();
        this.backoffMultiplier *= 2;
        this.logger.info("Scheduling new stream connection after delay [{} s]", new Object[]{Long.valueOf(longValue)});
        this.executorService.schedule(new TweetStreamStarter(), longValue, TimeUnit.SECONDS);
    }

    private void resetBackoff() {
        this.attemptCounter = 0;
        this.backoffMultiplier = 1L;
    }

    private Set<String> parseCommaSeparatedProperties(ProcessContext processContext, PropertyDescriptor propertyDescriptor) {
        HashSet hashSet = null;
        if (processContext.getProperty(propertyDescriptor).isSet()) {
            hashSet = new HashSet();
            for (String str : processContext.getProperty(propertyDescriptor).getValue().split(",")) {
                hashSet.add(str.trim());
            }
        }
        return hashSet;
    }
}
