package storm.kafka;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;

/* loaded from: input_file:storm/kafka/KafkaJavaApiSpout.class */
public class KafkaJavaApiSpout extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaJavaApiSpout.class);
    private KafkaConsumer<String, byte[]> consumer;
    private Iterator<ConsumerRecord<String, byte[]>> it;
    private Map<TopicPartition, OffsetAndMetadata> toBeCommitted;
    private AtomicBoolean rebalanceFlag;
    private int batchUpperLimit;
    private int maxBatchDurationMillis;
    private List<String> topicList;
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    long pollTimeout;
    long maxFailCount;
    private final List<Values> messagesList = new ArrayList();
    private Lock lock = new ReentrantLock();
    ConcurrentMap<Long, Values> messages = new ConcurrentHashMap();

    /* loaded from: input_file:storm/kafka/KafkaJavaApiSpout$KafkaSpoutConstants.class */
    public class KafkaSpoutConstants {
        public static final String BATCH_SIZE = "batchSize";
        public static final String BATCH_DURATION_MS = "batchDurationMillis";
        public static final int DEFAULT_BATCH_SIZE = 1000;
        public static final int DEFAULT_BATCH_DURATION = 1000;

        public KafkaSpoutConstants() {
        }
    }

    public KafkaJavaApiSpout(SpoutConfig spoutConfig) {
        this._spoutConfig = spoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        LOG.debug("Opening KafkaJavaApiSpout...");
        if (map.containsKey("kafka.poll.timeout")) {
            this.pollTimeout = ((Long) map.get("kafka.poll.timeout")).longValue();
        } else {
            this.pollTimeout = 1000L;
        }
        if (map.containsKey("kafka.max.fails")) {
            this.maxFailCount = ((Long) map.get("kafka.max.fails")).longValue();
        } else {
            this.maxFailCount = 5L;
        }
        this.toBeCommitted = new HashMap();
        this.rebalanceFlag = new AtomicBoolean(false);
        this.topicList = this._spoutConfig.topic == null ? this._spoutConfig.topics : Collections.singletonList(this._spoutConfig.topic);
        if (this.topicList == null || this.topicList.isEmpty()) {
            throw new KafkaException("At least one Kafka topic must be specified.");
        }
        this.batchUpperLimit = map.containsKey(KafkaSpoutConstants.BATCH_SIZE) ? ((Integer) map.get(KafkaSpoutConstants.BATCH_SIZE)).intValue() : 1000;
        this.maxBatchDurationMillis = map.containsKey(KafkaSpoutConstants.BATCH_DURATION_MS) ? ((Integer) map.get(KafkaSpoutConstants.BATCH_DURATION_MS)).intValue() : 1000;
        try {
            this.consumer = new KafkaConsumer<>(map);
            this.consumer.subscribe(this.topicList, new RebalanceListener(this.rebalanceFlag));
            this.it = this.consumer.poll(this.pollTimeout).iterator();
            LOG.info("Kafka spout started.");
        } catch (Exception e) {
            throw new KafkaException("Unable to create consumer. Check whether the Bootstrap server is up and that the Storm can connect to it.", e);
        }
    }

    public void nextTuple() {
        LOG.debug("Polling next tuple...");
        String uuid = UUID.randomUUID().toString();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            long currentTimeMillis2 = System.currentTimeMillis() + this.maxBatchDurationMillis;
            while (true) {
                if (this.messagesList.size() >= this.batchUpperLimit || System.currentTimeMillis() >= currentTimeMillis2) {
                    break;
                }
                if (this.it == null || !this.it.hasNext()) {
                    this.it = this.consumer.poll(Math.max(0L, currentTimeMillis2 - System.currentTimeMillis())).iterator();
                    if (this.rebalanceFlag.get()) {
                        this.rebalanceFlag.set(false);
                        break;
                    } else if (!this.it.hasNext()) {
                        LOG.debug("Returning with backoff. No more data to read");
                        break;
                    }
                }
                ConsumerRecord<String, byte[]> next = this.it.next();
                byte[] bArr = (byte[]) next.value();
                String str = (String) next.key();
                LOG.debug("Message: {}", new String(bArr));
                LOG.debug("Topic: {} Partition: {}", next.topic(), Integer.valueOf(next.partition()));
                Values values = new Values(new Object[]{str, bArr, next.topic(), Long.valueOf(this.maxFailCount)});
                this.messagesList.add(values);
                this.messages.putIfAbsent(Long.valueOf(next.offset()), values);
                LOG.debug("Waited: {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                LOG.debug("Messages #: {}", Integer.valueOf(this.messagesList.size()));
                this.toBeCommitted.put(new TopicPartition(next.topic(), next.partition()), new OffsetAndMetadata(next.offset() + 1, uuid));
            }
            if (this.messagesList.size() > 0) {
                Iterator<Values> it = this.messagesList.iterator();
                while (it.hasNext()) {
                    this._collector.emit(it.next());
                }
                LOG.debug("Emitted {}", Integer.valueOf(this.messagesList.size()));
                this.messagesList.clear();
                try {
                    this.lock.lock();
                    this.consumer.commitSync(this.toBeCommitted);
                    this.toBeCommitted.clear();
                    this.lock.unlock();
                } catch (Throwable th) {
                    this.lock.unlock();
                    throw th;
                }
            }
        } catch (Exception e) {
            LOG.error("KafkaJavaApiSpout EXCEPTION, {}", e);
        }
    }

    public void fail(Object obj) {
        LOG.debug("Message with offset {} failed", obj);
        Values values = this.messages.get(obj);
        Long l = (Long) values.get(3);
        if (l.longValue() < 1) {
            LOG.debug("Message with offset {} reached maximum fail attempts. Skipping...", obj);
        } else {
            values.set(3, Long.valueOf(l.longValue() - 1));
            this._collector.emit(values, obj);
        }
    }

    public void ack(Object obj) {
        LOG.debug("Message with offset {} proceeded successfully", obj);
        this.messages.remove(obj);
    }

    public void close() {
        if (this.consumer != null) {
            try {
                this.lock.lock();
                this.consumer.wakeup();
                this.consumer.close();
            } finally {
                this.lock.unlock();
            }
        }
        LOG.info("Kafka Spout stopped.");
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE, "topic", "attempt"}));
    }
}
