/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Fetcher<K, V> {
    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
    private final KafkaClient client;
    private final Time time;
    private final int minBytes;
    private final int maxWaitMs;
    private final int fetchSize;
    private final boolean checkCrcs;
    private final Metadata metadata;
    private final FetchManagerMetrics sensors;
    private final SubscriptionState subscriptions;
    private final List<PartitionRecords<K, V>> records;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;

    public Fetcher(KafkaClient client, int minBytes, int maxWaitMs, int fetchSize, boolean checkCrcs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, Time time) {
        this.time = time;
        this.client = client;
        this.metadata = metadata;
        this.subscriptions = subscriptions;
        this.minBytes = minBytes;
        this.maxWaitMs = maxWaitMs;
        this.fetchSize = fetchSize;
        this.checkCrcs = checkCrcs;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.records = new LinkedList<PartitionRecords<K, V>>();
        this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix, metricTags);
    }

    public void initFetches(Cluster cluster, long now) {
        for (ClientRequest request : this.createFetchRequests(cluster)) {
            Node node = cluster.nodeById(Integer.parseInt(request.request().destination()));
            if (!this.client.ready(node, now)) continue;
            log.trace("Initiating fetch to node {}: {}", (Object)node.id(), (Object)request);
            this.client.send(request);
        }
    }

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.subscriptions.partitionAssignmentNeeded()) {
            return Collections.emptyMap();
        }
        HashMap drained = new HashMap();
        for (PartitionRecords<K, V> part : this.records) {
            Long consumed = this.subscriptions.consumed(part.partition);
            if (this.subscriptions.assignedPartitions().contains(part.partition) && (consumed == null || part.fetchOffset == consumed)) {
                List records = (List)drained.get(part.partition);
                if (records == null) {
                    records = part.records;
                    drained.put(part.partition, records);
                } else {
                    records.addAll(part.records);
                }
                this.subscriptions.consumed(part.partition, part.records.get(part.records.size() - 1).offset() + 1L);
                continue;
            }
            log.debug("Ignoring fetched records for {} at offset {}", (Object)part.partition, (Object)part.fetchOffset);
        }
        this.records.clear();
        return drained;
    }

    public RequestFuture<Long> listOffset(final TopicPartition topicPartition, long timestamp) {
        HashMap<TopicPartition, ListOffsetRequest.PartitionData> partitions = new HashMap<TopicPartition, ListOffsetRequest.PartitionData>(1);
        partitions.put(topicPartition, new ListOffsetRequest.PartitionData(timestamp, 1));
        long now = this.time.milliseconds();
        PartitionInfo info = this.metadata.fetch().partition(topicPartition);
        if (info == null) {
            this.metadata.add(topicPartition.topic());
            log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", (Object)topicPartition);
            return RequestFuture.metadataRefreshNeeded();
        }
        if (info.leader() == null) {
            log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", (Object)topicPartition);
            return RequestFuture.metadataRefreshNeeded();
        }
        if (this.client.ready(info.leader(), now)) {
            final RequestFuture<Long> future = new RequestFuture<Long>();
            Node node = info.leader();
            ListOffsetRequest request = new ListOffsetRequest(-1, partitions);
            RequestSend send = new RequestSend(node.idString(), this.client.nextRequestHeader(ApiKeys.LIST_OFFSETS), request.toStruct());
            RequestCompletionHandler completionHandler = new RequestCompletionHandler(){

                @Override
                public void onComplete(ClientResponse resp) {
                    Fetcher.this.handleListOffsetResponse(topicPartition, resp, future);
                }
            };
            ClientRequest clientRequest = new ClientRequest(now, true, send, completionHandler);
            this.client.send(clientRequest);
            return future;
        }
        return RequestFuture.pollNeeded();
    }

    private void handleListOffsetResponse(TopicPartition topicPartition, ClientResponse clientResponse, RequestFuture<Long> future) {
        if (clientResponse.wasDisconnected()) {
            future.retryAfterMetadataRefresh();
        } else {
            ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody());
            short errorCode = lor.responseData().get((Object)topicPartition).errorCode;
            if (errorCode == Errors.NONE.code()) {
                List<Long> offsets = lor.responseData().get((Object)topicPartition).offsets;
                if (offsets.size() != 1) {
                    throw new IllegalStateException("This should not happen.");
                }
                long offset = offsets.get(0);
                log.debug("Fetched offset {} for partition {}", (Object)offset, (Object)topicPartition);
                future.complete(offset);
            } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", (Object)topicPartition);
                future.retryAfterMetadataRefresh();
            } else {
                log.error("Attempt to fetch offsets for partition {} failed due to: {}", (Object)topicPartition, (Object)Errors.forCode(errorCode).exception().getMessage());
                future.retryAfterMetadataRefresh();
            }
        }
    }

    private List<ClientRequest> createFetchRequests(Cluster cluster) {
        HashMap fetchable = new HashMap();
        for (TopicPartition partition : this.subscriptions.assignedPartitions()) {
            Node node = cluster.leaderFor(partition);
            if (node == null) {
                this.metadata.requestUpdate();
                continue;
            }
            if (this.client.inFlightRequestCount(node.idString()) != 0) continue;
            HashMap<TopicPartition, FetchRequest.PartitionData> fetch = (HashMap<TopicPartition, FetchRequest.PartitionData>)fetchable.get(node.id());
            if (fetch == null) {
                fetch = new HashMap<TopicPartition, FetchRequest.PartitionData>();
                fetchable.put(node.id(), fetch);
            }
            long offset = this.subscriptions.fetched(partition);
            fetch.put(partition, new FetchRequest.PartitionData(offset, this.fetchSize));
        }
        ArrayList<ClientRequest> requests = new ArrayList<ClientRequest>(fetchable.size());
        for (Map.Entry entry : fetchable.entrySet()) {
            int nodeId = (Integer)entry.getKey();
            final FetchRequest fetch = new FetchRequest(this.maxWaitMs, this.minBytes, (Map)entry.getValue());
            RequestSend send = new RequestSend(Integer.toString(nodeId), this.client.nextRequestHeader(ApiKeys.FETCH), fetch.toStruct());
            RequestCompletionHandler handler = new RequestCompletionHandler(){

                @Override
                public void onComplete(ClientResponse response) {
                    Fetcher.this.handleFetchResponse(response, fetch);
                }
            };
            requests.add(new ClientRequest(this.time.milliseconds(), true, send, handler));
        }
        return requests;
    }

    private void handleFetchResponse(ClientResponse resp, FetchRequest request) {
        if (resp.wasDisconnected()) {
            int correlation = resp.request().request().header().correlationId();
            log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected", new Object[]{resp.request(), correlation, resp.request().request().destination()});
        } else {
            int totalBytes = 0;
            int totalCount = 0;
            FetchResponse response = new FetchResponse(resp.responseBody());
            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                FetchResponse.PartitionData partition = entry.getValue();
                if (!this.subscriptions.assignedPartitions().contains(tp)) {
                    log.debug("Ignoring fetched data for partition {} which is no longer assigned.", (Object)tp);
                    continue;
                }
                if (partition.errorCode == Errors.NONE.code()) {
                    int bytes = 0;
                    ByteBuffer buffer = partition.recordSet;
                    MemoryRecords records = MemoryRecords.readableRecords(buffer);
                    long fetchOffset = request.fetchData().get((Object)tp).offset;
                    ArrayList parsed = new ArrayList();
                    for (LogEntry logEntry : records) {
                        parsed.add(this.parseRecord(tp, logEntry));
                        bytes += logEntry.size();
                    }
                    if (parsed.size() > 0) {
                        ConsumerRecord record = (ConsumerRecord)parsed.get(parsed.size() - 1);
                        this.subscriptions.fetched(tp, record.offset() + 1L);
                        this.records.add(new PartitionRecords(fetchOffset, tp, parsed));
                        this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
                    }
                    this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size());
                    totalBytes += bytes;
                    totalCount += parsed.size();
                    continue;
                }
                if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                    this.metadata.requestUpdate();
                    continue;
                }
                if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) {
                    log.info("Fetch offset {} is out of range, resetting offset", (Object)this.subscriptions.fetched(tp));
                    this.subscriptions.needOffsetReset(tp);
                    continue;
                }
                if (partition.errorCode == Errors.UNKNOWN.code()) {
                    log.warn("Unknown error fetching data for topic-partition {}", (Object)tp);
                    continue;
                }
                throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data");
            }
            this.sensors.bytesFetched.record(totalBytes);
            this.sensors.recordsFetched.record(totalCount);
        }
        this.sensors.fetchLatency.record(resp.requestLatencyMs());
    }

    private ConsumerRecord<K, V> parseRecord(TopicPartition partition, LogEntry logEntry) {
        if (this.checkCrcs) {
            logEntry.record().ensureValid();
        }
        long offset = logEntry.offset();
        ByteBuffer keyBytes = logEntry.record().key();
        Object key = keyBytes == null ? null : (Object)this.keyDeserializer.deserialize(partition.topic(), Utils.toArray(keyBytes));
        ByteBuffer valueBytes = logEntry.record().value();
        Object value = valueBytes == null ? null : (Object)this.valueDeserializer.deserialize(partition.topic(), Utils.toArray(valueBytes));
        return new ConsumerRecord<Object, Object>(partition.topic(), partition.partition(), offset, key, value);
    }

    private class FetchManagerMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor bytesFetched;
        public final Sensor recordsFetched;
        public final Sensor fetchLatency;
        public final Sensor recordsFetchLag;

        public FetchManagerMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
            this.bytesFetched = metrics.sensor("bytes-fetched");
            this.bytesFetched.add(new MetricName("fetch-size-avg", this.metricGrpName, "The average number of bytes fetched per request", tags), new Avg());
            this.bytesFetched.add(new MetricName("fetch-size-max", this.metricGrpName, "The maximum number of bytes fetched per request", tags), new Max());
            this.bytesFetched.add(new MetricName("bytes-consumed-rate", this.metricGrpName, "The average number of bytes consumed per second", tags), new Rate());
            this.recordsFetched = metrics.sensor("records-fetched");
            this.recordsFetched.add(new MetricName("records-per-request-avg", this.metricGrpName, "The average number of records in each request", tags), new Avg());
            this.recordsFetched.add(new MetricName("records-consumed-rate", this.metricGrpName, "The average number of records consumed per second", tags), new Rate());
            this.fetchLatency = metrics.sensor("fetch-latency");
            this.fetchLatency.add(new MetricName("fetch-latency-avg", this.metricGrpName, "The average time taken for a fetch request.", tags), new Avg());
            this.fetchLatency.add(new MetricName("fetch-latency-max", this.metricGrpName, "The max time taken for any fetch request.", tags), new Max());
            this.fetchLatency.add(new MetricName("fetch-rate", this.metricGrpName, "The number of fetch requests per second.", tags), new Rate(new Count()));
            this.recordsFetchLag = metrics.sensor("records-lag");
            this.recordsFetchLag.add(new MetricName("records-lag-max", this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window", tags), new Max());
        }

        public void recordTopicFetchMetrics(String topic, int bytes, int records) {
            String name = "topic." + topic + ".bytes-fetched";
            Sensor bytesFetched = this.metrics.getSensor(name);
            if (bytesFetched == null) {
                bytesFetched = this.metrics.sensor(name);
            }
            bytesFetched.record(bytes);
            name = "topic." + topic + ".records-fetched";
            Sensor recordsFetched = this.metrics.getSensor(name);
            if (recordsFetched == null) {
                recordsFetched = this.metrics.sensor(name);
            }
            recordsFetched.record(records);
        }
    }

    private static class PartitionRecords<K, V> {
        public long fetchOffset;
        public TopicPartition partition;
        public List<ConsumerRecord<K, V>> records;

        public PartitionRecords(long fetchOffset, TopicPartition partition, List<ConsumerRecord<K, V>> records) {
            this.fetchOffset = fetchOffset;
            this.partition = partition;
            this.records = records;
        }
    }
}

