package storm.kafka;

import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.trident.MaxMetric;

/* loaded from: input_file:storm/kafka/PartitionManager.class */
public class PartitionManager {
    public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
    private final CombinedMetric _fetchAPILatencyMax;
    private final ReducedMetric _fetchAPILatencyMean;
    private final CountMetric _fetchAPICallCount;
    private final CountMetric _fetchAPIMessageCount;
    Long _emittedToOffset;
    Long _committedTo;
    Partition _partition;
    SpoutConfig _spoutConfig;
    String _topologyInstanceId;
    SimpleConsumer _consumer;
    DynamicPartitionConnections _connections;
    ZkState _state;
    Map _stormConf;
    SortedSet<Long> _pending = new TreeSet();
    SortedSet<Long> failed = new TreeSet();
    LinkedList<KafkaSpout.MessageAndRealOffset> _waitingToEmit = new LinkedList<>();
    long numberFailed = 0;
    long numberAcked = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/kafka/PartitionManager$KafkaMessageId.class */
    public static class KafkaMessageId {
        public Partition partition;
        public long offset;

        public KafkaMessageId(Partition partition, long j) {
            this.partition = partition;
            this.offset = j;
        }
    }

    public PartitionManager(DynamicPartitionConnections dynamicPartitionConnections, String str, ZkState zkState, Map map, SpoutConfig spoutConfig, Partition partition) {
        this._partition = partition;
        this._connections = dynamicPartitionConnections;
        this._spoutConfig = spoutConfig;
        this._topologyInstanceId = str;
        this._consumer = dynamicPartitionConnections.register(partition.host, partition.partition);
        this._state = zkState;
        this._stormConf = map;
        String str2 = null;
        Long l = null;
        String committedPath = committedPath();
        try {
            Map<Object, Object> readJSON = this._state.readJSON(committedPath);
            LOG.info("Read partition information from: " + committedPath + "  --> " + readJSON);
            if (readJSON != null) {
                str2 = (String) ((Map) readJSON.get("topology")).get("id");
                l = (Long) readJSON.get("offset");
            }
        } catch (Throwable th) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath, th);
        }
        Long valueOf = Long.valueOf(KafkaUtils.getOffset(this._consumer, spoutConfig.topic, partition.partition, spoutConfig));
        if (str2 == null || l == null) {
            this._committedTo = valueOf;
            LOG.info("No partition information found, using configuration to determine offset");
        } else if (str.equals(str2) || !spoutConfig.forceFromStart) {
            this._committedTo = l;
            LOG.info("Read last commit offset from zookeeper: " + this._committedTo + "; old topology_id: " + str2 + " - new topology_id: " + str);
        } else {
            this._committedTo = Long.valueOf(KafkaUtils.getOffset(this._consumer, spoutConfig.topic, partition.partition, spoutConfig.startOffsetTime));
            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
        }
        if (valueOf.longValue() - this._committedTo.longValue() > spoutConfig.maxOffsetBehind || this._committedTo.longValue() <= 0) {
            LOG.info("Last commit offset from zookeeper: " + this._committedTo);
            this._committedTo = valueOf;
            LOG.info("Commit offset " + this._committedTo + " is more than " + spoutConfig.maxOffsetBehind + " behind, resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
        }
        LOG.info("Starting Kafka " + this._consumer.host() + ":" + partition.partition + " from offset " + this._committedTo);
        this._emittedToOffset = this._committedTo;
        this._fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
        this._fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
        this._fetchAPICallCount = new CountMetric();
        this._fetchAPIMessageCount = new CountMetric();
    }

    public Map getMetricsDataMap() {
        HashMap hashMap = new HashMap();
        hashMap.put(this._partition + "/fetchAPILatencyMax", this._fetchAPILatencyMax.getValueAndReset());
        hashMap.put(this._partition + "/fetchAPILatencyMean", this._fetchAPILatencyMean.getValueAndReset());
        hashMap.put(this._partition + "/fetchAPICallCount", this._fetchAPICallCount.getValueAndReset());
        hashMap.put(this._partition + "/fetchAPIMessageCount", this._fetchAPIMessageCount.getValueAndReset());
        return hashMap;
    }

    public KafkaSpout.EmitState next(SpoutOutputCollector spoutOutputCollector) {
        if (this._waitingToEmit.isEmpty()) {
            fill();
        }
        while (true) {
            KafkaSpout.MessageAndRealOffset pollFirst = this._waitingToEmit.pollFirst();
            if (pollFirst == null) {
                return KafkaSpout.EmitState.NO_EMITTED;
            }
            Iterable<List<Object>> generateTuples = KafkaUtils.generateTuples(this._spoutConfig, pollFirst.msg);
            if (generateTuples != null) {
                Iterator<List<Object>> it = generateTuples.iterator();
                while (it.hasNext()) {
                    spoutOutputCollector.emit(it.next(), new KafkaMessageId(this._partition, pollFirst.offset));
                }
                return !this._waitingToEmit.isEmpty() ? KafkaSpout.EmitState.EMITTED_MORE_LEFT : KafkaSpout.EmitState.EMITTED_END;
            }
            ack(Long.valueOf(pollFirst.offset));
        }
    }

    private void fill() {
        long nanoTime = System.nanoTime();
        boolean z = !this.failed.isEmpty();
        long longValue = z ? this.failed.first().longValue() : this._emittedToOffset.longValue();
        ByteBufferMessageSet fetchMessages = KafkaUtils.fetchMessages(this._spoutConfig, this._consumer, this._partition, longValue);
        long nanoTime2 = (System.nanoTime() - nanoTime) / 1000000;
        this._fetchAPILatencyMax.update(Long.valueOf(nanoTime2));
        this._fetchAPILatencyMean.update(Long.valueOf(nanoTime2));
        this._fetchAPICallCount.incr();
        if (fetchMessages != null) {
            int i = 0;
            Iterator it = fetchMessages.iterator();
            while (it.hasNext()) {
                MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                Long valueOf = Long.valueOf(messageAndOffset.offset());
                if (valueOf.longValue() >= longValue && (!z || this.failed.contains(valueOf))) {
                    i++;
                    this._pending.add(valueOf);
                    this._waitingToEmit.add(new KafkaSpout.MessageAndRealOffset(messageAndOffset.message(), valueOf.longValue()));
                    this._emittedToOffset = Long.valueOf(Math.max(messageAndOffset.nextOffset(), this._emittedToOffset.longValue()));
                    if (z) {
                        this.failed.remove(valueOf);
                    }
                }
            }
            this._fetchAPIMessageCount.incrBy(i);
        }
    }

    public void ack(Long l) {
        if (!this._pending.isEmpty() && this._pending.first().longValue() < l.longValue() - this._spoutConfig.maxOffsetBehind) {
            this._pending.headSet(Long.valueOf(l.longValue() - this._spoutConfig.maxOffsetBehind)).clear();
        }
        this._pending.remove(l);
        this.numberAcked++;
    }

    public void fail(Long l) {
        if (l.longValue() < this._emittedToOffset.longValue() - this._spoutConfig.maxOffsetBehind) {
            LOG.info("Skipping failed tuple at offset=" + l + " because it's more than maxOffsetBehind=" + this._spoutConfig.maxOffsetBehind + " behind _emittedToOffset=" + this._emittedToOffset);
            return;
        }
        LOG.debug("failing at offset=" + l + " with _pending.size()=" + this._pending.size() + " pending and _emittedToOffset=" + this._emittedToOffset);
        this.failed.add(l);
        this.numberFailed++;
        if (this.numberAcked == 0 && this.numberFailed > this._spoutConfig.maxOffsetBehind) {
            throw new RuntimeException("Too many tuple failures");
        }
    }

    public void commit() {
        long lastCompletedOffset = lastCompletedOffset();
        if (this._committedTo.longValue() == lastCompletedOffset) {
            LOG.debug("No new offset for " + this._partition + " for topology: " + this._topologyInstanceId);
            return;
        }
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + this._partition + " for topology: " + this._topologyInstanceId);
        this._state.writeJSON(committedPath(), ImmutableMap.builder().put("topology", ImmutableMap.of("id", this._topologyInstanceId, "name", this._stormConf.get("topology.name"))).put("offset", Long.valueOf(lastCompletedOffset)).put("partition", Integer.valueOf(this._partition.partition)).put("broker", ImmutableMap.of("host", this._partition.host.host, "port", Integer.valueOf(this._partition.host.port))).put(KafkaBolt.TOPIC, this._spoutConfig.topic).build());
        this._committedTo = Long.valueOf(lastCompletedOffset);
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + this._partition + " for topology: " + this._topologyInstanceId);
    }

    private String committedPath() {
        return this._spoutConfig.zkRoot + "/" + this._spoutConfig.id + "/" + this._partition.getId();
    }

    public long lastCompletedOffset() {
        return this._pending.isEmpty() ? this._emittedToOffset.longValue() : this._pending.first().longValue();
    }

    public Partition getPartition() {
        return this._partition;
    }

    public void close() {
        this._connections.unregister(this._partition.host, this._partition.partition);
    }
}
