/*
 * Decompiled with CFR 0.152.
 */
package storm.kafka;

import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.ICombiner;
import backtype.storm.metric.api.IReducer;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.DynamicPartitionConnections;
import storm.kafka.ExponentialBackoffMsgRetryManager;
import storm.kafka.FailedMsgRetryManager;
import storm.kafka.KafkaSpoutOld;
import storm.kafka.KafkaUtils;
import storm.kafka.MessageMetadataSchemeAsMultiScheme;
import storm.kafka.Partition;
import storm.kafka.SpoutConfig;
import storm.kafka.TopicOffsetOutOfRangeException;
import storm.kafka.ZkState;
import storm.kafka.trident.MaxMetric;

public class PartitionManager {
    public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
    private final CombinedMetric _fetchAPILatencyMax;
    private final ReducedMetric _fetchAPILatencyMean;
    private final CountMetric _fetchAPICallCount;
    private final CountMetric _fetchAPIMessageCount;
    Long _emittedToOffset;
    private SortedMap<Long, Long> _pending = new TreeMap<Long, Long>();
    private final FailedMsgRetryManager _failedMsgRetryManager;
    Long _committedTo;
    LinkedList<KafkaSpoutOld.MessageAndRealOffset> _waitingToEmit = new LinkedList();
    Partition _partition;
    SpoutConfig _spoutConfig;
    String _topologyInstanceId;
    SimpleConsumer _consumer;
    DynamicPartitionConnections _connections;
    ZkState _state;
    Map _stormConf;
    long numberFailed;
    long numberAcked;

    public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
        this._partition = id;
        this._connections = connections;
        this._spoutConfig = spoutConfig;
        this._topologyInstanceId = topologyInstanceId;
        this._consumer = connections.register(id.host, id.topic, id.partition);
        this._state = state;
        this._stormConf = stormConf;
        this.numberFailed = 0L;
        this.numberAcked = 0L;
        this._failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(this._spoutConfig.retryInitialDelayMs, this._spoutConfig.retryDelayMultiplier, this._spoutConfig.retryDelayMaxMs);
        String jsonTopologyId = null;
        Long jsonOffset = null;
        String path = this.committedPath();
        try {
            Map<Object, Object> json = this._state.readJSON(path);
            LOG.info("Read partition information from: " + path + "  --> " + json);
            if (json != null) {
                jsonTopologyId = (String)((Map)json.get("topology")).get("id");
                jsonOffset = (Long)json.get("offset");
            }
        }
        catch (Throwable e) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
        }
        String topic = this._partition.topic;
        Long currentOffset = KafkaUtils.getOffset(this._consumer, topic, id.partition, spoutConfig);
        if (jsonTopologyId == null || jsonOffset == null) {
            this._committedTo = currentOffset;
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
            this._committedTo = KafkaUtils.getOffset(this._consumer, topic, id.partition, spoutConfig.startOffsetTime);
            LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
        } else {
            this._committedTo = jsonOffset;
            LOG.info("Read last commit offset from zookeeper: " + this._committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId);
        }
        if (currentOffset - this._committedTo > spoutConfig.maxOffsetBehind || this._committedTo <= 0L) {
            LOG.info("Last commit offset from zookeeper: " + this._committedTo);
            Long lastCommittedOffset = this._committedTo;
            this._committedTo = currentOffset;
            LOG.info("Commit offset " + lastCommittedOffset + " is more than " + spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
        }
        LOG.info("Starting Kafka " + this._consumer.host() + ":" + id.partition + " from offset " + this._committedTo);
        this._emittedToOffset = this._committedTo;
        this._fetchAPILatencyMax = new CombinedMetric((ICombiner)new MaxMetric());
        this._fetchAPILatencyMean = new ReducedMetric((IReducer)new MeanReducer());
        this._fetchAPICallCount = new CountMetric();
        this._fetchAPIMessageCount = new CountMetric();
    }

    public Map getMetricsDataMap() {
        HashMap<String, Object> ret = new HashMap<String, Object>();
        ret.put(this._partition + "/fetchAPILatencyMax", this._fetchAPILatencyMax.getValueAndReset());
        ret.put(this._partition + "/fetchAPILatencyMean", this._fetchAPILatencyMean.getValueAndReset());
        ret.put(this._partition + "/fetchAPICallCount", this._fetchAPICallCount.getValueAndReset());
        ret.put(this._partition + "/fetchAPIMessageCount", this._fetchAPIMessageCount.getValueAndReset());
        return ret;
    }

    public KafkaSpoutOld.EmitState next(SpoutOutputCollector collector) {
        if (this._waitingToEmit.isEmpty()) {
            this.fill();
        }
        while (true) {
            KafkaSpoutOld.MessageAndRealOffset toEmit;
            if ((toEmit = this._waitingToEmit.pollFirst()) == null) {
                return KafkaSpoutOld.EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> tups = this._spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme ? KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme)this._spoutConfig.scheme, toEmit.msg, this._partition, toEmit.offset) : KafkaUtils.generateTuples(this._spoutConfig, toEmit.msg, this._partition.topic);
            if (tups != null && tups.iterator().hasNext()) {
                if (this._spoutConfig.topicAsStreamId) {
                    for (List<Object> tup : tups) {
                        collector.emit(this._spoutConfig.topic, tup, (Object)new KafkaMessageId(this._partition, toEmit.offset));
                    }
                } else {
                    for (List<Object> tup : tups) {
                        collector.emit(tup, (Object)new KafkaMessageId(this._partition, toEmit.offset));
                    }
                }
                break;
            }
            this.ack(toEmit.offset);
        }
        if (!this._waitingToEmit.isEmpty()) {
            return KafkaSpoutOld.EmitState.EMITTED_MORE_LEFT;
        }
        return KafkaSpoutOld.EmitState.EMITTED_END;
    }

    private void fill() {
        boolean processingNewTuples;
        long start2 = System.nanoTime();
        Long offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
        boolean bl = processingNewTuples = offset == null;
        if (processingNewTuples) {
            offset = this._emittedToOffset;
        }
        ByteBufferMessageSet msgs = null;
        try {
            msgs = KafkaUtils.fetchMessages(this._spoutConfig, this._consumer, this._partition, offset);
        }
        catch (TopicOffsetOutOfRangeException e) {
            this._emittedToOffset = KafkaUtils.getOffset(this._consumer, this._partition.topic, this._partition.partition, OffsetRequest.EarliestTime());
            LOG.warn("{} Using new offset: {}", this._partition.partition, (Object)this._emittedToOffset);
            if (!processingNewTuples) {
                Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(this._emittedToOffset);
                LOG.warn("Removing the failed offsets that are out of range: {}", omitted);
            }
            return;
        }
        long end = System.nanoTime();
        long millis = (end - start2) / 1000000L;
        this._fetchAPILatencyMax.update((Object)millis);
        this._fetchAPILatencyMean.update((Object)millis);
        this._fetchAPICallCount.incr();
        if (msgs != null) {
            int numMessages = 0;
            for (MessageAndOffset msg : msgs) {
                Long cur_offset = msg.offset();
                if (cur_offset < offset || !processingNewTuples && !this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) continue;
                ++numMessages;
                if (!this._pending.containsKey(cur_offset)) {
                    this._pending.put(cur_offset, System.currentTimeMillis());
                }
                this._waitingToEmit.add(new KafkaSpoutOld.MessageAndRealOffset(msg.message(), cur_offset));
                this._emittedToOffset = Math.max(msg.nextOffset(), this._emittedToOffset);
                if (!this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) continue;
                this._failedMsgRetryManager.retryStarted(cur_offset);
            }
            this._fetchAPIMessageCount.incrBy((long)numMessages);
        }
    }

    public void ack(Long offset) {
        if (!this._pending.isEmpty() && this._pending.firstKey() < offset - this._spoutConfig.maxOffsetBehind) {
            this._pending.headMap(offset - this._spoutConfig.maxOffsetBehind).clear();
        }
        this._pending.remove(offset);
        this._failedMsgRetryManager.acked(offset);
        ++this.numberAcked;
    }

    public void fail(Long offset) {
        if (offset < this._emittedToOffset - this._spoutConfig.maxOffsetBehind) {
            LOG.info("Skipping failed tuple at offset=" + offset + " because it's more than maxOffsetBehind=" + this._spoutConfig.maxOffsetBehind + " behind _emittedToOffset=" + this._emittedToOffset);
        } else {
            LOG.debug("failing at offset=" + offset + " with _pending.size()=" + this._pending.size() + " pending and _emittedToOffset=" + this._emittedToOffset);
            ++this.numberFailed;
            if (this.numberAcked == 0L && this.numberFailed > this._spoutConfig.maxOffsetBehind) {
                throw new RuntimeException("Too many tuple failures");
            }
            this._failedMsgRetryManager.failed(offset);
        }
    }

    public void commit() {
        long lastCompletedOffset = this.lastCompletedOffset();
        if (this._committedTo != lastCompletedOffset) {
            LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + this._partition + " for topology: " + this._topologyInstanceId);
            ImmutableMap<Object, Object> data2 = ImmutableMap.builder().put("topology", ImmutableMap.of("id", this._topologyInstanceId, "name", this._stormConf.get("topology.name"))).put("offset", (ImmutableMap<String, String>)lastCompletedOffset).put("partition", (ImmutableMap<String, String>)this._partition.partition).put("broker", ImmutableMap.of("host", this._partition.host.host, "port", this._partition.host.port)).put("topic", (ImmutableMap<String, Integer>)((Object)this._partition.topic)).build();
            this._state.writeJSON(this.committedPath(), data2);
            this._committedTo = lastCompletedOffset;
            LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + this._partition + " for topology: " + this._topologyInstanceId);
        } else {
            LOG.debug("No new offset for " + this._partition + " for topology: " + this._topologyInstanceId);
        }
    }

    private String committedPath() {
        return this._spoutConfig.zkRoot + "/" + this._spoutConfig.id + "/" + this._partition.getId();
    }

    public long lastCompletedOffset() {
        if (this._pending.isEmpty()) {
            return this._emittedToOffset;
        }
        return this._pending.firstKey();
    }

    public Partition getPartition() {
        return this._partition;
    }

    public void close() {
        this.commit();
        this._connections.unregister(this._partition.host, this._partition.topic, this._partition.partition);
    }

    static class KafkaMessageId {
        public Partition partition;
        public long offset;

        public KafkaMessageId(Partition partition2, long offset) {
            this.partition = partition2;
            this.offset = offset;
        }
    }
}

