package storm.kafka;

import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaUtils;
import storm.kafka.PartitionManager;

/* loaded from: input_file:storm/kafka/KafkaSpoutOld.class */
public class KafkaSpoutOld extends BaseRichSpout {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutOld.class);
    SpoutConfig _spoutConfig;
    SpoutOutputCollector _collector;
    PartitionCoordinator _coordinator;
    DynamicPartitionConnections _connections;
    ZkState _state;
    long _lastUpdateMs = 0;
    int _currPartitionIndex = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/kafka/KafkaSpoutOld$EmitState.class */
    public enum EmitState {
        EMITTED_MORE_LEFT,
        EMITTED_END,
        NO_EMITTED
    }

    /* loaded from: input_file:storm/kafka/KafkaSpoutOld$MessageAndRealOffset.class */
    public static class MessageAndRealOffset {
        public Message msg;
        public long offset;

        public MessageAndRealOffset(Message message, long j) {
            this.msg = message;
            this.offset = j;
        }
    }

    public KafkaSpoutOld(SpoutConfig spoutConfig) {
        this._spoutConfig = spoutConfig;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._collector = spoutOutputCollector;
        String stormId = topologyContext.getStormId();
        HashMap hashMap = new HashMap(map);
        List<String> list = this._spoutConfig.zkServers;
        if (list == null) {
            list = (List) map.get("storm.zookeeper.servers");
        }
        Integer num = this._spoutConfig.zkPort;
        if (num == null) {
            num = Integer.valueOf(((Number) map.get("storm.zookeeper.port")).intValue());
        }
        hashMap.put("transactional.zookeeper.servers", list);
        hashMap.put("transactional.zookeeper.port", num);
        hashMap.put("transactional.zookeeper.root", this._spoutConfig.zkRoot);
        this._state = new ZkState(hashMap);
        this._connections = new DynamicPartitionConnections(this._spoutConfig, KafkaUtils.makeBrokerReader(map, this._spoutConfig));
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        if (this._spoutConfig.hosts instanceof StaticHosts) {
            this._coordinator = new StaticCoordinator(this._connections, map, this._spoutConfig, this._state, topologyContext.getThisTaskIndex(), size, stormId);
        } else {
            this._coordinator = new ZkCoordinator(this._connections, map, this._spoutConfig, this._state, topologyContext.getThisTaskIndex(), size, stormId);
        }
        topologyContext.registerMetric("kafkaOffset", new IMetric() { // from class: storm.kafka.KafkaSpoutOld.1
            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;

            {
                this._kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(KafkaSpoutOld.this._connections);
            }

            public Object getValueAndReset() {
                List<PartitionManager> myManagedPartitions = KafkaSpoutOld.this._coordinator.getMyManagedPartitions();
                HashSet hashSet = new HashSet();
                Iterator<PartitionManager> it = myManagedPartitions.iterator();
                while (it.hasNext()) {
                    hashSet.add(it.next().getPartition());
                }
                this._kafkaOffsetMetric.refreshPartitions(hashSet);
                for (PartitionManager partitionManager : myManagedPartitions) {
                    this._kafkaOffsetMetric.setLatestEmittedOffset(partitionManager.getPartition(), partitionManager.lastCompletedOffset());
                }
                return this._kafkaOffsetMetric.getValueAndReset();
            }
        }, this._spoutConfig.metricsTimeBucketSizeInSecs);
        topologyContext.registerMetric("kafkaPartition", new IMetric() { // from class: storm.kafka.KafkaSpoutOld.2
            public Object getValueAndReset() {
                List<PartitionManager> myManagedPartitions = KafkaSpoutOld.this._coordinator.getMyManagedPartitions();
                HashMap hashMap2 = new HashMap();
                Iterator<PartitionManager> it = myManagedPartitions.iterator();
                while (it.hasNext()) {
                    hashMap2.putAll(it.next().getMetricsDataMap());
                }
                return hashMap2;
            }
        }, this._spoutConfig.metricsTimeBucketSizeInSecs);
    }

    public void close() {
        this._state.close();
    }

    public void nextTuple() {
        EmitState next;
        List<PartitionManager> myManagedPartitions = this._coordinator.getMyManagedPartitions();
        for (int i = 0; i < myManagedPartitions.size(); i++) {
            try {
                this._currPartitionIndex %= myManagedPartitions.size();
                next = myManagedPartitions.get(this._currPartitionIndex).next(this._collector);
                if (next != EmitState.EMITTED_MORE_LEFT) {
                    this._currPartitionIndex = (this._currPartitionIndex + 1) % myManagedPartitions.size();
                }
            } catch (FailedFetchException e) {
                LOG.warn("Fetch failed", (Throwable) e);
                this._coordinator.refresh();
            }
            if (next != EmitState.NO_EMITTED) {
                break;
            }
        }
        long currentTimeMillis = System.currentTimeMillis() - this._lastUpdateMs;
        if (currentTimeMillis > this._spoutConfig.stateUpdateIntervalMs || currentTimeMillis < 0) {
            commit();
        }
    }

    public void ack(Object obj) {
        PartitionManager.KafkaMessageId kafkaMessageId = (PartitionManager.KafkaMessageId) obj;
        PartitionManager manager = this._coordinator.getManager(kafkaMessageId.partition);
        if (manager != null) {
            manager.ack(Long.valueOf(kafkaMessageId.offset));
        }
    }

    public void fail(Object obj) {
        PartitionManager.KafkaMessageId kafkaMessageId = (PartitionManager.KafkaMessageId) obj;
        PartitionManager manager = this._coordinator.getManager(kafkaMessageId.partition);
        if (manager != null) {
            manager.fail(Long.valueOf(kafkaMessageId.offset));
        }
    }

    public void deactivate() {
        commit();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        if (this._spoutConfig.topicAsStreamId) {
            outputFieldsDeclarer.declareStream(this._spoutConfig.topic, this._spoutConfig.scheme.getOutputFields());
        } else {
            outputFieldsDeclarer.declare(this._spoutConfig.scheme.getOutputFields());
        }
    }

    private void commit() {
        this._lastUpdateMs = System.currentTimeMillis();
        Iterator<PartitionManager> it = this._coordinator.getMyManagedPartitions().iterator();
        while (it.hasNext()) {
            it.next().commit();
        }
    }
}
