/*
 * Decompiled with CFR 0.152.
 */
package storm.kafka;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.RebalanceListener;
import storm.kafka.SpoutConfig;

public class KafkaJavaApiSpout
extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaJavaApiSpout.class);
    private final List<Values> messagesList = new ArrayList<Values>();
    private KafkaConsumer<String, byte[]> consumer;
    private Iterator<ConsumerRecord<String, byte[]>> it;
    private Map<TopicPartition, OffsetAndMetadata> toBeCommitted;
    private AtomicBoolean rebalanceFlag;
    private int batchUpperLimit;
    private int maxBatchDurationMillis;
    private List<String> topicList;
    private Lock lock = new ReentrantLock();
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    ConcurrentMap<Long, Values> messages = new ConcurrentHashMap<Long, Values>();
    long pollTimeout;
    long maxFailCount;

    public KafkaJavaApiSpout(SpoutConfig spoutConfig) {
        this._spoutConfig = spoutConfig;
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this._collector = collector;
        LOG.debug("Opening KafkaJavaApiSpout...");
        this.pollTimeout = conf.containsKey("kafka.poll.timeout") ? (Long)conf.get("kafka.poll.timeout") : 1000L;
        this.maxFailCount = conf.containsKey("kafka.max.fails") ? (Long)conf.get("kafka.max.fails") : 5L;
        this.toBeCommitted = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.rebalanceFlag = new AtomicBoolean(false);
        List<String> list2 = this.topicList = this._spoutConfig.topic == null ? this._spoutConfig.topics : Collections.singletonList(this._spoutConfig.topic);
        if (this.topicList == null || this.topicList.isEmpty()) {
            throw new KafkaException("At least one Kafka topic must be specified.");
        }
        this.batchUpperLimit = conf.containsKey("batchSize") ? (Integer)conf.get("batchSize") : 1000;
        this.maxBatchDurationMillis = conf.containsKey("batchDurationMillis") ? (Integer)conf.get("batchDurationMillis") : 1000;
        try {
            this.consumer = new KafkaConsumer(conf);
        }
        catch (Exception e) {
            throw new KafkaException("Unable to create consumer. Check whether the Bootstrap server is up and that the Storm can connect to it.", (Throwable)e);
        }
        this.consumer.subscribe(this.topicList, (ConsumerRebalanceListener)new RebalanceListener(this.rebalanceFlag));
        this.it = this.consumer.poll(this.pollTimeout).iterator();
        LOG.info("Kafka spout started.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void nextTuple() {
        block10: {
            LOG.debug("Polling next tuple...");
            String batchUUID = UUID.randomUUID().toString();
            try {
                long batchStartTime = System.currentTimeMillis();
                long batchEndTime = System.currentTimeMillis() + (long)this.maxBatchDurationMillis;
                while (this.messagesList.size() < this.batchUpperLimit && System.currentTimeMillis() < batchEndTime) {
                    if (this.it == null || !this.it.hasNext()) {
                        ConsumerRecords records = this.consumer.poll(Math.max(0L, batchEndTime - System.currentTimeMillis()));
                        this.it = records.iterator();
                        if (this.rebalanceFlag.get()) {
                            this.rebalanceFlag.set(false);
                            break;
                        }
                        if (!this.it.hasNext()) {
                            LOG.debug("Returning with backoff. No more data to read");
                            break;
                        }
                    }
                    ConsumerRecord<String, byte[]> message = this.it.next();
                    byte[] kafkaMessage = (byte[])message.value();
                    String kafkaKey = (String)message.key();
                    LOG.debug("Message: {}", (Object)new String(kafkaMessage));
                    LOG.debug("Topic: {} Partition: {}", (Object)message.topic(), (Object)message.partition());
                    Values value2 = new Values(new Object[]{kafkaKey, kafkaMessage, message.topic(), this.maxFailCount});
                    this.messagesList.add(value2);
                    this.messages.putIfAbsent(message.offset(), value2);
                    LOG.debug("Waited: {} ", (Object)(System.currentTimeMillis() - batchStartTime));
                    LOG.debug("Messages #: {}", (Object)this.messagesList.size());
                    long offset = message.offset() + 1L;
                    this.toBeCommitted.put(new TopicPartition(message.topic(), message.partition()), new OffsetAndMetadata(offset, batchUUID));
                }
                if (this.messagesList.size() <= 0) break block10;
                for (Values v : this.messagesList) {
                    this._collector.emit((List)v);
                }
                LOG.debug("Emitted {}", (Object)this.messagesList.size());
                this.messagesList.clear();
                try {
                    this.lock.lock();
                    this.consumer.commitSync(this.toBeCommitted);
                    this.toBeCommitted.clear();
                }
                finally {
                    this.lock.unlock();
                }
            }
            catch (Exception e) {
                LOG.error("KafkaJavaApiSpout EXCEPTION, {}", e);
            }
        }
    }

    public void fail(Object msgId) {
        LOG.debug("Message with offset {} failed", msgId);
        Values message = (Values)this.messages.get(msgId);
        Long currentAttempt = (Long)message.get(3);
        if (currentAttempt < 1L) {
            LOG.debug("Message with offset {} reached maximum fail attempts. Skipping...", msgId);
        } else {
            message.set(3, (Object)(currentAttempt - 1L));
            this._collector.emit((List)message, msgId);
        }
    }

    public void ack(Object msgId) {
        LOG.debug("Message with offset {} proceeded successfully", msgId);
        this.messages.remove(msgId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        if (this.consumer != null) {
            try {
                this.lock.lock();
                this.consumer.wakeup();
                this.consumer.close();
            }
            finally {
                this.lock.unlock();
            }
        }
        LOG.info("Kafka Spout stopped.");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(new String[]{"key", "message", "topic", "attempt"}));
    }

    public class KafkaSpoutConstants {
        public static final String BATCH_SIZE = "batchSize";
        public static final String BATCH_DURATION_MS = "batchDurationMillis";
        public static final int DEFAULT_BATCH_SIZE = 1000;
        public static final int DEFAULT_BATCH_DURATION = 1000;
    }
}

