package storm.kafka;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import kafka.message.Message;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;

/* loaded from: input_file:storm/kafka/KafkaJavaApiSpout.class */
public class KafkaJavaApiSpout extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaJavaApiSpout.class);
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    KafkaConsumer consumer;
    ConcurrentMap<Long, Values> messages;
    long pollTimeout;
    long maxFailCount;

    /* loaded from: input_file:storm/kafka/KafkaJavaApiSpout$MessageAndRealOffset.class */
    public static class MessageAndRealOffset {
        public Message msg;
        public long offset;

        public MessageAndRealOffset(Message message, long j) {
            this.msg = message;
            this.offset = j;
        }
    }

    public KafkaJavaApiSpout() {
    }

    public KafkaJavaApiSpout(SpoutConfig spoutConfig) {
        this._spoutConfig = spoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        LOG.debug("Opening KafkaJavaApiSpout...");
        if (map.containsKey("kafka.poll.timeout")) {
            this.pollTimeout = ((Long) map.get("kafka.poll.timeout")).longValue();
        } else {
            this.pollTimeout = 100L;
        }
        if (map.containsKey("kafka.max.fails")) {
            this.maxFailCount = ((Long) map.get("kafka.max.fails")).longValue();
        } else {
            this.maxFailCount = 5L;
        }
        if (this.consumer == null) {
            this.consumer = new KafkaConsumer(map);
        }
        this.messages = new ConcurrentHashMap();
        if (this._spoutConfig.topic == null) {
            this.consumer.subscribe(this._spoutConfig.topics);
        } else {
            this.consumer.subscribe(Collections.singletonList(this._spoutConfig.topic));
        }
    }

    public void nextTuple() {
        LOG.debug("Polling next tuple...");
        for (ConsumerRecord consumerRecord : this.consumer.poll(this.pollTimeout)) {
            Values values = new Values(new Object[]{consumerRecord.key(), consumerRecord.value(), consumerRecord.topic(), Long.valueOf(this.maxFailCount)});
            this.messages.putIfAbsent(Long.valueOf(consumerRecord.offset()), values);
            this._collector.emit(values, Long.valueOf(consumerRecord.offset()));
        }
    }

    public void fail(Object obj) {
        LOG.debug("Message with offset {} failed", obj);
        Values values = this.messages.get(obj);
        Long l = (Long) values.get(3);
        if (l.longValue() < 1) {
            LOG.debug("Message with offset {} reached maximum fail attempts. Skipping...", obj);
        } else {
            values.set(3, Long.valueOf(l.longValue() - 1));
            this._collector.emit(values, obj);
        }
    }

    public void ack(Object obj) {
        LOG.debug("Message with offset {} proceeded successfully", obj);
        this.messages.remove(obj);
    }

    public void close() {
        this.consumer.close();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{FieldNameBasedTupleToKafkaMapper.BOLT_KEY, "message", "topic", "attempt"}));
    }
}
