package storm.kafka;

import backtype.storm.metric.api.IMetric;
import backtype.storm.utils.Utils;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.trident.GlobalPartitionInformation;
import storm.kafka.trident.IBrokerReader;
import storm.kafka.trident.StaticBrokerReader;
import storm.kafka.trident.ZkBrokerReader;

/* loaded from: input_file:storm/kafka/KafkaUtils.class */
public class KafkaUtils {
    public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
    private static final int NO_OFFSET = -5;

    /* loaded from: input_file:storm/kafka/KafkaUtils$KafkaOffsetMetric.class */
    public static class KafkaOffsetMetric implements IMetric {
        Map<Partition, Long> _partitionToOffset = new HashMap();
        Set<Partition> _partitions;
        String _topic;
        DynamicPartitionConnections _connections;

        public KafkaOffsetMetric(String str, DynamicPartitionConnections dynamicPartitionConnections) {
            this._topic = str;
            this._connections = dynamicPartitionConnections;
        }

        public void setLatestEmittedOffset(Partition partition, long j) {
            this._partitionToOffset.put(partition, Long.valueOf(j));
        }

        public Object getValueAndReset() {
            try {
                long j = 0;
                long j2 = 0;
                long j3 = 0;
                long j4 = 0;
                HashMap hashMap = new HashMap();
                if (this._partitions == null || this._partitions.size() != this._partitionToOffset.size()) {
                    KafkaUtils.LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
                    return null;
                }
                for (Map.Entry<Partition, Long> entry : this._partitionToOffset.entrySet()) {
                    Partition key = entry.getKey();
                    SimpleConsumer connection = this._connections.getConnection(key);
                    if (connection == null) {
                        KafkaUtils.LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                        return null;
                    }
                    long offset = KafkaUtils.getOffset(connection, this._topic, key.partition, OffsetRequest.LatestTime());
                    long offset2 = KafkaUtils.getOffset(connection, this._topic, key.partition, OffsetRequest.EarliestTime());
                    if (offset == 0) {
                        KafkaUtils.LOG.warn("No data found in Kafka Partition " + key.getId());
                        return null;
                    }
                    long longValue = entry.getValue().longValue();
                    long j5 = offset - longValue;
                    hashMap.put(key.getId() + "/spoutLag", Long.valueOf(j5));
                    hashMap.put(key.getId() + "/earliestTimeOffset", Long.valueOf(offset2));
                    hashMap.put(key.getId() + "/latestTimeOffset", Long.valueOf(offset));
                    hashMap.put(key.getId() + "/latestEmittedOffset", Long.valueOf(longValue));
                    j += j5;
                    j2 += offset2;
                    j3 += offset;
                    j4 += longValue;
                }
                hashMap.put("totalSpoutLag", Long.valueOf(j));
                hashMap.put("totalEarliestTimeOffset", Long.valueOf(j2));
                hashMap.put("totalLatestTimeOffset", Long.valueOf(j3));
                hashMap.put("totalLatestEmittedOffset", Long.valueOf(j4));
                return hashMap;
            } catch (Throwable th) {
                KafkaUtils.LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", th);
                return null;
            }
        }

        public void refreshPartitions(Set<Partition> set) {
            this._partitions = set;
            Iterator<Partition> it = this._partitionToOffset.keySet().iterator();
            while (it.hasNext()) {
                if (!set.contains(it.next())) {
                    it.remove();
                }
            }
        }
    }

    public static IBrokerReader makeBrokerReader(Map map, KafkaConfig kafkaConfig) {
        return kafkaConfig.hosts instanceof StaticHosts ? new StaticBrokerReader(((StaticHosts) kafkaConfig.hosts).getPartitionInformation()) : new ZkBrokerReader(map, kafkaConfig.topic, (ZkHosts) kafkaConfig.hosts);
    }

    public static long getOffset(SimpleConsumer simpleConsumer, String str, int i, KafkaConfig kafkaConfig) {
        long LatestTime = OffsetRequest.LatestTime();
        if (kafkaConfig.forceFromStart) {
            LatestTime = kafkaConfig.startOffsetTime;
        }
        return getOffset(simpleConsumer, str, i, LatestTime);
    }

    public static long getOffset(SimpleConsumer simpleConsumer, String str, int i, long j) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(str, i);
        HashMap hashMap = new HashMap();
        hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(j, 1));
        long[] offsets = simpleConsumer.getOffsetsBefore(new kafka.javaapi.OffsetRequest(hashMap, OffsetRequest.CurrentVersion(), simpleConsumer.clientId())).offsets(str, i);
        if (offsets.length > 0) {
            return offsets[0];
        }
        return -5L;
    }

    public static ByteBufferMessageSet fetchMessages(KafkaConfig kafkaConfig, SimpleConsumer simpleConsumer, Partition partition, long j) throws UpdateOffsetException {
        String str = kafkaConfig.topic;
        int i = partition.partition;
        try {
            FetchResponse fetch = simpleConsumer.fetch(new FetchRequestBuilder().addFetch(str, i, j, kafkaConfig.fetchSizeBytes).clientId(kafkaConfig.clientId).maxWait(kafkaConfig.fetchMaxWait).build());
            if (!fetch.hasError()) {
                return fetch.messageSet(str, i);
            }
            KafkaError error = KafkaError.getError(fetch.errorCode(str, i));
            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange) {
                LOG.warn("Got fetch request with offset out of range: [" + j + "]; retrying with default start offset time from configuration. configured start offset time: [" + kafkaConfig.startOffsetTime + "]");
                throw new UpdateOffsetException();
            }
            String str2 = "Error fetching data from [" + partition + "] for topic [" + str + "]: [" + error + "]";
            LOG.error(str2);
            throw new FailedFetchException(str2);
        } catch (Exception e) {
            if (!(e instanceof ConnectException) && !(e instanceof SocketTimeoutException) && !(e instanceof IOException) && !(e instanceof UnresolvedAddressException)) {
                throw new RuntimeException(e);
            }
            LOG.warn("Network error when fetching messages:", e);
            throw new FailedFetchException(e);
        }
    }

    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message message) {
        ByteBuffer payload = message.payload();
        if (payload == null) {
            return null;
        }
        ByteBuffer key = message.key();
        return (key == null || !(kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme)) ? kafkaConfig.scheme.deserialize(Utils.toByteArray(payload)) : kafkaConfig.scheme.deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
    }

    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation globalPartitionInformation, int i, int i2) {
        Preconditions.checkArgument(i2 < i, "task index must be less that total tasks");
        List<Partition> orderedPartitions = globalPartitionInformation.getOrderedPartitions();
        int size = orderedPartitions.size();
        if (size < i) {
            LOG.warn("there are more tasks than partitions (tasks: " + i + "; partitions: " + size + "), some tasks will be idle");
        }
        ArrayList arrayList = new ArrayList();
        int i3 = i2;
        while (true) {
            int i4 = i3;
            if (i4 >= size) {
                logPartitionMapping(i, i2, arrayList);
                return arrayList;
            }
            arrayList.add(orderedPartitions.get(i4));
            i3 = i4 + i;
        }
    }

    private static void logPartitionMapping(int i, int i2, List<Partition> list) {
        String taskId = taskId(i2, i);
        if (list.isEmpty()) {
            LOG.warn(taskId + "no partitions assigned");
        } else {
            LOG.info(taskId + "assigned " + list);
        }
    }

    public static String taskId(int i, int i2) {
        return "Task [" + (i + 1) + "/" + i2 + "] ";
    }
}
