/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureAdapter;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.StaleMetadataException;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Fetcher<K, V> {
    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
    private final ConsumerNetworkClient client;
    private final Time time;
    private final int minBytes;
    private final int maxWaitMs;
    private final int fetchSize;
    private final long retryBackoffMs;
    private final boolean checkCrcs;
    private final Metadata metadata;
    private final FetchManagerMetrics sensors;
    private final SubscriptionState subscriptions;
    private final List<PartitionRecords<K, V>> records;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final Map<TopicPartition, Long> offsetOutOfRangePartitions;
    private final Set<String> unauthorizedTopics;
    private final Map<TopicPartition, Long> recordTooLargePartitions;

    public Fetcher(ConsumerNetworkClient client, int minBytes, int maxWaitMs, int fetchSize, boolean checkCrcs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, Time time, long retryBackoffMs) {
        this.time = time;
        this.client = client;
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.minBytes = minBytes;
        this.maxWaitMs = maxWaitMs;
        this.fetchSize = fetchSize;
        this.checkCrcs = checkCrcs;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.records = new LinkedList<PartitionRecords<K, V>>();
        this.offsetOutOfRangePartitions = new HashMap<TopicPartition, Long>();
        this.unauthorizedTopics = new HashSet<String>();
        this.recordTooLargePartitions = new HashMap<TopicPartition, Long>();
        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
        this.retryBackoffMs = retryBackoffMs;
    }

    public void initFetches(Cluster cluster) {
        for (Map.Entry<Node, FetchRequest> fetchEntry : this.createFetchRequests(cluster).entrySet()) {
            final FetchRequest fetch = fetchEntry.getValue();
            this.client.send(fetchEntry.getKey(), ApiKeys.FETCH, fetch).addListener(new RequestFutureListener<ClientResponse>(){

                @Override
                public void onSuccess(ClientResponse response) {
                    Fetcher.this.handleFetchResponse(response, fetch);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    log.debug("Fetch failed", (Throwable)e);
                }
            });
        }
    }

    public void updateFetchPositions(Set<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            if (!this.subscriptions.isAssigned(tp) || this.subscriptions.isFetchable(tp)) continue;
            if (this.subscriptions.isOffsetResetNeeded(tp)) {
                this.resetOffset(tp);
                continue;
            }
            if (this.subscriptions.committed(tp) == null) {
                this.subscriptions.needOffsetReset(tp);
                this.resetOffset(tp);
                continue;
            }
            long committed = this.subscriptions.committed(tp).offset();
            log.debug("Resetting offset for partition {} to the committed offset {}", (Object)tp, (Object)committed);
            this.subscriptions.seek(tp, committed);
        }
    }

    public Map<String, List<PartitionInfo>> getAllTopicMetadata(long timeout) {
        return this.getTopicMetadata(null, timeout);
    }

    public Map<String, List<PartitionInfo>> getTopicMetadata(List<String> topics, long timeout) {
        if (topics != null && topics.isEmpty()) {
            return Collections.emptyMap();
        }
        long start = this.time.milliseconds();
        long remaining = timeout;
        do {
            long elapsed;
            RequestFuture<ClientResponse> future = this.sendMetadataRequest(topics);
            this.client.poll(future, remaining);
            if (future.failed() && !future.isRetriable()) {
                throw future.exception();
            }
            if (future.succeeded()) {
                MetadataResponse response = new MetadataResponse(future.value().responseBody());
                Cluster cluster = response.cluster();
                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
                if (!unauthorizedTopics.isEmpty()) {
                    throw new TopicAuthorizationException(unauthorizedTopics);
                }
                boolean shouldRetry = false;
                if (!response.errors().isEmpty()) {
                    log.debug("Topic metadata fetch included errors: {}", response.errors());
                    for (Map.Entry<String, Errors> errorEntry : response.errors().entrySet()) {
                        String topic = errorEntry.getKey();
                        Errors error = errorEntry.getValue();
                        if (error == Errors.INVALID_TOPIC_EXCEPTION) {
                            throw new InvalidTopicException("Topic '" + topic + "' is invalid");
                        }
                        if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) continue;
                        if (error.exception() instanceof RetriableException) {
                            shouldRetry = true;
                            continue;
                        }
                        throw new KafkaException("Unexpected error fetching metadata for topic " + topic, error.exception());
                    }
                }
                if (!shouldRetry) {
                    HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<String, List<PartitionInfo>>();
                    for (String topic : cluster.topics()) {
                        topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic));
                    }
                    return topicsPartitionInfos;
                }
            }
            if ((remaining = timeout - (elapsed = this.time.milliseconds() - start)) <= 0L) continue;
            long backoff = Math.min(remaining, this.retryBackoffMs);
            this.time.sleep(backoff);
            remaining -= backoff;
        } while (remaining > 0L);
        throw new TimeoutException("Timeout expired while fetching topic metadata");
    }

    private RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) {
        Node node;
        if (topics == null) {
            topics = Collections.emptyList();
        }
        if ((node = this.client.leastLoadedNode()) == null) {
            return RequestFuture.noBrokersAvailable();
        }
        return this.client.send(node, ApiKeys.METADATA, new MetadataRequest(topics));
    }

    private void resetOffset(TopicPartition partition) {
        long timestamp;
        OffsetResetStrategy strategy = this.subscriptions.resetStrategy(partition);
        if (strategy == OffsetResetStrategy.EARLIEST) {
            timestamp = -2L;
        } else if (strategy == OffsetResetStrategy.LATEST) {
            timestamp = -1L;
        } else {
            throw new NoOffsetForPartitionException(partition);
        }
        log.debug("Resetting offset for partition {} to {} offset.", (Object)partition, (Object)strategy.name().toLowerCase());
        long offset = this.listOffset(partition, timestamp);
        if (this.subscriptions.isAssigned(partition)) {
            this.subscriptions.seek(partition, offset);
        }
    }

    private long listOffset(TopicPartition partition, long timestamp) {
        while (true) {
            RequestFuture<Long> future = this.sendListOffsetRequest(partition, timestamp);
            this.client.poll(future);
            if (future.succeeded()) {
                return future.value();
            }
            if (!future.isRetriable()) {
                throw future.exception();
            }
            if (future.exception() instanceof InvalidMetadataException) {
                this.client.awaitMetadataUpdate();
                continue;
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    private void throwIfOffsetOutOfRange() throws OffsetOutOfRangeException {
        HashMap<TopicPartition, Long> currentOutOfRangePartitions = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> entry : this.offsetOutOfRangePartitions.entrySet()) {
            if (!this.subscriptions.isFetchable(entry.getKey())) {
                log.debug("Ignoring fetched records for {} since it is no longer fetchable", (Object)entry.getKey());
                continue;
            }
            Long position = this.subscriptions.position(entry.getKey());
            if (position == null || !entry.getValue().equals(position)) continue;
            currentOutOfRangePartitions.put(entry.getKey(), entry.getValue());
        }
        this.offsetOutOfRangePartitions.clear();
        if (!currentOutOfRangePartitions.isEmpty()) {
            throw new OffsetOutOfRangeException(currentOutOfRangePartitions);
        }
    }

    private void throwIfUnauthorizedTopics() throws TopicAuthorizationException {
        if (!this.unauthorizedTopics.isEmpty()) {
            HashSet<String> topics = new HashSet<String>(this.unauthorizedTopics);
            this.unauthorizedTopics.clear();
            throw new TopicAuthorizationException(topics);
        }
    }

    private void throwIfRecordTooLarge() throws RecordTooLargeException {
        HashMap<TopicPartition, Long> copiedRecordTooLargePartitions = new HashMap<TopicPartition, Long>(this.recordTooLargePartitions);
        this.recordTooLargePartitions.clear();
        if (!copiedRecordTooLargePartitions.isEmpty()) {
            throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + copiedRecordTooLargePartitions + " whose size is larger than the fetch size " + this.fetchSize + " and hence cannot be ever returned." + " Increase the fetch size, or decrease the maximum message size the broker will allow.", copiedRecordTooLargePartitions);
        }
    }

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {
            return Collections.emptyMap();
        }
        HashMap drained = new HashMap();
        this.throwIfOffsetOutOfRange();
        this.throwIfUnauthorizedTopics();
        this.throwIfRecordTooLarge();
        for (PartitionRecords<K, V> part : this.records) {
            if (!this.subscriptions.isAssigned(part.partition)) {
                log.debug("Not returning fetched records for partition {} since it is no longer assigned", (Object)part.partition);
                continue;
            }
            long position = this.subscriptions.position(part.partition);
            if (!this.subscriptions.isFetchable(part.partition)) {
                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", (Object)part.partition);
                continue;
            }
            if (part.fetchOffset == position) {
                long nextOffset = part.records.get(part.records.size() - 1).offset() + 1L;
                log.trace("Returning fetched records at offset {} for assigned partition {} and update position to {}", new Object[]{position, part.partition, nextOffset});
                List records = (List)drained.get(part.partition);
                if (records == null) {
                    records = part.records;
                    drained.put(part.partition, records);
                } else {
                    records.addAll(part.records);
                }
                this.subscriptions.position(part.partition, nextOffset);
                continue;
            }
            log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", new Object[]{part.partition, part.fetchOffset, position});
        }
        this.records.clear();
        return drained;
    }

    private RequestFuture<Long> sendListOffsetRequest(final TopicPartition topicPartition, long timestamp) {
        HashMap<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
        partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
        PartitionInfo info = this.metadata.fetch().partition(topicPartition);
        if (info == null) {
            this.metadata.add(topicPartition.topic());
            log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", (Object)topicPartition);
            return RequestFuture.staleMetadata();
        }
        if (info.leader() == null) {
            log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", (Object)topicPartition);
            return RequestFuture.leaderNotAvailable();
        }
        Node node = info.leader();
        ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
        return this.client.send(node, ApiKeys.LIST_OFFSETS, request).compose(new RequestFutureAdapter<ClientResponse, Long>(){

            @Override
            public void onSuccess(ClientResponse response, RequestFuture<Long> future) {
                Fetcher.this.handleListOffsetResponse(topicPartition, response, future);
            }
        });
    }

    private void handleListOffsetResponse(TopicPartition topicPartition, ClientResponse clientResponse, RequestFuture<Long> future) {
        ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
        short errorCode = lor.responseData().get((Object)topicPartition).errorCode;
        if (errorCode == Errors.NONE.code()) {
            List<Long> offsets = lor.responseData().get((Object)topicPartition).offsets;
            if (offsets.size() != 1) {
                throw new IllegalStateException("This should not happen.");
            }
            long offset = offsets.get(0);
            log.debug("Fetched offset {} for partition {}", (Object)offset, (Object)topicPartition);
            future.complete(offset);
        } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
            log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", (Object)topicPartition);
            future.raise(Errors.forCode(errorCode));
        } else {
            log.error("Attempt to fetch offsets for partition {} failed due to: {}", (Object)topicPartition, (Object)Errors.forCode(errorCode).exception().getMessage());
            future.raise(new StaleMetadataException());
        }
    }

    private Map<Node, FetchRequest> createFetchRequests(Cluster cluster) {
        HashMap fetchable = new HashMap();
        for (TopicPartition partition : this.subscriptions.fetchablePartitions()) {
            Node node = cluster.leaderFor(partition);
            if (node == null) {
                this.metadata.requestUpdate();
                continue;
            }
            if (this.client.pendingRequestCount(node) != 0) continue;
            HashMap<TopicPartition, FetchRequest.PartitionData> fetch = (HashMap<TopicPartition, FetchRequest.PartitionData>)fetchable.get(node);
            if (fetch == null) {
                fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
                fetchable.put(node, fetch);
            }
            long position = this.subscriptions.position(partition);
            fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
            log.trace("Added fetch request for partition {} at offset {}", (Object)partition, (Object)position);
        }
        HashMap<Node, FetchRequest> requests = new HashMap<Node, FetchRequest>();
        for (Map.Entry entry : fetchable.entrySet()) {
            Node node = (Node)entry.getKey();
            FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, (Map)entry.getValue());
            requests.put(node, fetch);
        }
        return requests;
    }

    private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
        int totalBytes = 0;
        int totalCount = 0;
        FetchResponse response = new FetchResponse(resp.responseBody());
        for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
            long fetchOffset;
            TopicPartition tp = entry.getKey();
            FetchResponse.PartitionData partition = entry.getValue();
            if (!this.subscriptions.isFetchable(tp)) {
                log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", (Object)tp);
                continue;
            }
            if (partition.errorCode == Errors.NONE.code()) {
                fetchOffset = request.fetchData().get((Object)tp).offset;
                Long position = this.subscriptions.position(tp);
                if (position == null || position != fetchOffset) {
                    log.debug("Discarding fetch response for partition {} since its offset {} does not match the expected offset {}", new Object[]{tp, fetchOffset, position});
                    continue;
                }
                int bytes = 0;
                ByteBuffer buffer = partition.recordSet;
                MemoryRecords records = MemoryRecords.readableRecords(buffer);
                ArrayList parsed = new ArrayList();
                for (LogEntry logEntry : records) {
                    parsed.add(this.parseRecord(tp, logEntry));
                    bytes += logEntry.size();
                }
                if (!parsed.isEmpty()) {
                    log.trace("Adding fetched record for partition {} with offset {} to buffered record list", (Object)tp, (Object)position);
                    ConsumerRecord record = (ConsumerRecord)parsed.get(parsed.size() - 1);
                    this.records.add(new PartitionRecords(fetchOffset, tp, parsed));
                    this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
                } else if (buffer.limit() > 0) {
                    this.recordTooLargePartitions.put(tp, fetchOffset);
                }
                this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
                totalBytes += bytes;
                totalCount += parsed.size();
                continue;
            }
            if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                this.metadata.requestUpdate();
                continue;
            }
            if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
                fetchOffset = request.fetchData().get((Object)tp).offset;
                if (this.subscriptions.hasDefaultOffsetResetPolicy()) {
                    this.subscriptions.needOffsetReset(tp);
                } else {
                    this.offsetOutOfRangePartitions.put(tp, fetchOffset);
                }
                log.info("Fetch offset {} is out of range, resetting offset", (Object)fetchOffset);
                continue;
            }
            if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) {
                log.warn("Not authorized to read from topic {}.", (Object)tp.topic());
                this.unauthorizedTopics.add(tp.topic());
                continue;
            }
            if (partition.errorCode == Errors.UNKNOWN.code()) {
                log.warn("Unknown error fetching data for topic-partition {}", (Object)tp);
                continue;
            }
            throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
        }
        this.sensors.bytesFetched.record(totalBytes);
        this.sensors.recordsFetched.record(totalCount);
        this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
        this.sensors.fetchLatency.record(resp.requestLatencyMs());
    }

    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
        try {
            if (this.checkCrcs) {
                logEntry.record().ensureValid();
            }
            long offset = logEntry.offset();
            ByteBuffer keyBytes = logEntry.record().key();
            Object key = keyBytes == null ? null : (Object)this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
            ByteBuffer valueBytes = logEntry.record().value();
            Object value = valueBytes == null ? null : (Object)this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
            return new ConsumerRecord<Object, Object>(partition.topic(), partition.partition(), offset, key, value);
        }
        catch (KafkaException e) {
            throw e;
        }
        catch (RuntimeException e) {
            throw new KafkaException("Error deserializing key/value for partition " + partition + " at offset " + logEntry.offset(), e);
        }
    }

    private class FetchManagerMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor bytesFetched;
        public final Sensor recordsFetched;
        public final Sensor fetchLatency;
        public final Sensor recordsFetchLag;
        public final Sensor fetchThrottleTimeSensor;

        public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
            this.bytesFetched = metrics.sensor("bytes-fetched");
            this.bytesFetched.add(new MetricName("fetch-size-avg", this.metricGrpName, "The average number of bytes fetched per request", tags), new Avg());
            this.bytesFetched.add(new MetricName("fetch-size-max", this.metricGrpName, "The maximum number of bytes fetched per request", tags), new Max());
            this.bytesFetched.add(new MetricName("bytes-consumed-rate", this.metricGrpName, "The average number of bytes consumed per second", tags), new Rate());
            this.recordsFetched = metrics.sensor("records-fetched");
            this.recordsFetched.add(new MetricName("records-per-request-avg", this.metricGrpName, "The average number of records in each request", tags), new Avg());
            this.recordsFetched.add(new MetricName("records-consumed-rate", this.metricGrpName, "The average number of records consumed per second", tags), new Rate());
            this.fetchLatency = metrics.sensor("fetch-latency");
            this.fetchLatency.add(new MetricName("fetch-latency-avg", this.metricGrpName, "The average time taken for a fetch request.", tags), new Avg());
            this.fetchLatency.add(new MetricName("fetch-latency-max", this.metricGrpName, "The max time taken for any fetch request.", tags), new Max());
            this.fetchLatency.add(new MetricName("fetch-rate", this.metricGrpName, "The number of fetch requests per second.", tags), new Rate(new Count()));
            this.recordsFetchLag = metrics.sensor("records-lag");
            this.recordsFetchLag.add(new MetricName("records-lag-max", this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window", tags), new Max());
            this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-avg", this.metricGrpName, "The average throttle time in ms", tags), new Avg());
            this.fetchThrottleTimeSensor.add(new MetricName("fetch-throttle-time-max", this.metricGrpName, "The maximum throttle time in ms", tags), new Max());
        }

        public void recordTopicFetchMetrics(String topic, int bytes, int records) {
            String name = "topic." + topic + ".bytes-fetched";
            Sensor bytesFetched = this.metrics.getSensor(name);
            if (bytesFetched == null) {
                bytesFetched = this.metrics.sensor(name);
            }
            bytesFetched.record(bytes);
            name = "topic." + topic + ".records-fetched";
            Sensor recordsFetched = this.metrics.getSensor(name);
            if (recordsFetched == null) {
                recordsFetched = this.metrics.sensor(name);
            }
            recordsFetched.record(records);
        }
    }

    private static class PartitionRecords<K, V> {
        public long fetchOffset;
        public TopicPartition partition;
        public List<ConsumerRecord<K, V>> records;

        public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
            this.fetchOffset = fetchOffset;
            this.partition = partition;
            this.records = records;
        }
    }
}

