package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.ConsumerMetadataRequest;
import org.apache.kafka.common.requests.ConsumerMetadataResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Coordinator.class */
public final class Coordinator {
    private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
    private final KafkaClient client;
    private final Time time;
    private final String groupId;
    private final Heartbeat heartbeat;
    private final int sessionTimeoutMs;
    private final String assignmentStrategy;
    private final SubscriptionState subscriptions;
    private final CoordinatorMetrics sensors;
    private int generation = -1;
    private String consumerId = "";
    private Node consumerCoordinator = null;

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Coordinator$CommitOffsetCompletionHandler.class */
    private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
        private final Map<TopicPartition, Long> offsets;
        private final RequestFuture<Void> future;

        public CommitOffsetCompletionHandler(Map<TopicPartition, Long> map, RequestFuture<Void> requestFuture) {
            this.offsets = map;
            this.future = requestFuture;
        }

        @Override // org.apache.kafka.clients.RequestCompletionHandler
        public void onComplete(ClientResponse clientResponse) {
            if (clientResponse.wasDisconnected()) {
                Coordinator.this.handleCoordinatorDisconnect(clientResponse);
                this.future.retryWithNewCoordinator();
            } else {
                for (Map.Entry<TopicPartition, Short> entry : new OffsetCommitResponse(clientResponse.responseBody()).responseData().entrySet()) {
                    TopicPartition key = entry.getKey();
                    short shortValue = entry.getValue().shortValue();
                    long longValue = this.offsets.get(key).longValue();
                    if (shortValue == Errors.NONE.code()) {
                        Coordinator.log.debug("Committed offset {} for partition {}", Long.valueOf(longValue), key);
                        Coordinator.this.subscriptions.committed(key, longValue);
                    } else if (shortValue == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || shortValue == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                        Coordinator.this.coordinatorDead();
                        this.future.retryWithNewCoordinator();
                    } else {
                        this.future.retryAfterBackoff();
                        Coordinator.log.error("Error committing partition {} at offset {}: {}", new Object[]{key, Long.valueOf(longValue), Errors.forCode(shortValue).exception().getMessage()});
                    }
                }
                if (!this.future.isDone()) {
                    this.future.complete(null);
                }
            }
            Coordinator.this.sensors.commitLatency.record(clientResponse.requestLatencyMs());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Coordinator$CoordinatorMetrics.class */
    public class CoordinatorMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor commitLatency;
        public final Sensor heartbeatLatency;
        public final Sensor partitionReassignments;

        public CoordinatorMetrics(Metrics metrics, String str, Map<String, String> map) {
            this.metrics = metrics;
            this.metricGrpName = str + "-coordinator-metrics";
            this.commitLatency = metrics.sensor("commit-latency");
            this.commitLatency.add(new MetricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request", map), new Avg());
            this.commitLatency.add(new MetricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request", map), new Max());
            this.commitLatency.add(new MetricName("commit-rate", this.metricGrpName, "The number of commit calls per second", map), new Rate(new Count()));
            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max", this.metricGrpName, "The max time taken to receive a response to a hearbeat request", map), new Max());
            this.heartbeatLatency.add(new MetricName("heartbeat-rate", this.metricGrpName, "The average number of heartbeats per second", map), new Rate(new Count()));
            this.partitionReassignments = metrics.sensor("reassignment-latency");
            this.partitionReassignments.add(new MetricName("reassignment-time-avg", this.metricGrpName, "The average time taken for a partition reassignment", map), new Avg());
            this.partitionReassignments.add(new MetricName("reassignment-time-max", this.metricGrpName, "The max time taken for a partition reassignment", map), new Avg());
            this.partitionReassignments.add(new MetricName("reassignment-rate", this.metricGrpName, "The number of partition reassignments per second", map), new Rate(new Count()));
            metrics.addMetric(new MetricName("assigned-partitions", this.metricGrpName, "The number of partitions currently assigned to this consumer", map), new Measurable() { // from class: org.apache.kafka.clients.consumer.internals.Coordinator.CoordinatorMetrics.1
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return Coordinator.this.subscriptions.assignedPartitions().size();
                }
            });
            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last controller heartbeat", map), new Measurable() { // from class: org.apache.kafka.clients.consumer.internals.Coordinator.CoordinatorMetrics.2
                @Override // org.apache.kafka.common.metrics.Measurable
                public double measure(MetricConfig metricConfig, long j) {
                    return TimeUnit.SECONDS.convert(j - Coordinator.this.heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
                }
            });
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Coordinator$HeartbeatCompletionHandler.class */
    private class HeartbeatCompletionHandler implements RequestCompletionHandler {
        private HeartbeatCompletionHandler() {
        }

        @Override // org.apache.kafka.clients.RequestCompletionHandler
        public void onComplete(ClientResponse clientResponse) {
            if (clientResponse.wasDisconnected()) {
                Coordinator.this.handleCoordinatorDisconnect(clientResponse);
            } else {
                HeartbeatResponse heartbeatResponse = new HeartbeatResponse(clientResponse.responseBody());
                if (heartbeatResponse.errorCode() == Errors.NONE.code()) {
                    Coordinator.log.debug("Received successful heartbeat response.");
                } else if (heartbeatResponse.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || heartbeatResponse.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                    Coordinator.log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
                    Coordinator.this.coordinatorDead();
                } else if (heartbeatResponse.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
                    Coordinator.log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
                    Coordinator.this.subscriptions.needReassignment();
                } else {
                    if (heartbeatResponse.errorCode() != Errors.UNKNOWN_CONSUMER_ID.code()) {
                        throw new KafkaException("Unexpected error in heartbeat response: " + Errors.forCode(heartbeatResponse.errorCode()).exception().getMessage());
                    }
                    Coordinator.log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
                    Coordinator.this.consumerId = "";
                    Coordinator.this.subscriptions.needReassignment();
                }
            }
            Coordinator.this.sensors.heartbeatLatency.record(clientResponse.requestLatencyMs());
        }
    }

    public Coordinator(KafkaClient kafkaClient, String str, int i, String str2, SubscriptionState subscriptionState, Metrics metrics, String str3, Map<String, String> map, Time time) {
        this.time = time;
        this.client = kafkaClient;
        this.groupId = str;
        this.subscriptions = subscriptionState;
        this.sessionTimeoutMs = i;
        this.assignmentStrategy = str2;
        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
        this.sensors = new CoordinatorMetrics(metrics, str3, map);
    }

    public RequestFuture<Void> assignPartitions(long j) {
        final RequestFuture<Void> newCoordinatorRequestFuture = newCoordinatorRequestFuture(j);
        if (newCoordinatorRequestFuture.isDone()) {
            return newCoordinatorRequestFuture;
        }
        ArrayList arrayList = new ArrayList(this.subscriptions.subscribedTopics());
        log.debug("(Re-)joining group {} with subscribed topics {}", this.groupId, arrayList);
        JoinGroupRequest joinGroupRequest = new JoinGroupRequest(this.groupId, this.sessionTimeoutMs, arrayList, this.consumerId, this.assignmentStrategy);
        log.debug("Issuing request ({}: {}) to coordinator {}", new Object[]{ApiKeys.JOIN_GROUP, joinGroupRequest, Integer.valueOf(this.consumerCoordinator.id())});
        sendCoordinator(ApiKeys.JOIN_GROUP, joinGroupRequest.toStruct(), new RequestCompletionHandler() { // from class: org.apache.kafka.clients.consumer.internals.Coordinator.1
            @Override // org.apache.kafka.clients.RequestCompletionHandler
            public void onComplete(ClientResponse clientResponse) {
                Coordinator.this.handleJoinResponse(clientResponse, newCoordinatorRequestFuture);
            }
        }, j);
        return newCoordinatorRequestFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleJoinResponse(ClientResponse clientResponse, RequestFuture<Void> requestFuture) {
        if (clientResponse.wasDisconnected()) {
            handleCoordinatorDisconnect(clientResponse);
            requestFuture.retryWithNewCoordinator();
            return;
        }
        JoinGroupResponse joinGroupResponse = new JoinGroupResponse(clientResponse.responseBody());
        short errorCode = joinGroupResponse.errorCode();
        if (errorCode == Errors.NONE.code()) {
            this.consumerId = joinGroupResponse.consumerId();
            this.generation = joinGroupResponse.generationId();
            this.subscriptions.needRefreshCommits();
            log.debug("Joined group: {}", clientResponse);
            this.sensors.partitionReassignments.record(clientResponse.requestLatencyMs());
            this.subscriptions.changePartitionAssignment(joinGroupResponse.assignedPartitions());
            requestFuture.complete(null);
            return;
        }
        if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
            this.consumerId = "";
            log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.", this.groupId);
            requestFuture.retryNow();
        } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
            coordinatorDead();
            log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", this.groupId);
            requestFuture.retryWithNewCoordinator();
        } else {
            if (errorCode != Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code() && errorCode != Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code() && errorCode != Errors.INVALID_SESSION_TIMEOUT.code()) {
                requestFuture.raise(new KafkaException("Unexpected error in join group response: " + Errors.forCode(joinGroupResponse.errorCode()).exception().getMessage()));
                return;
            }
            ApiException exception = Errors.forCode(errorCode).exception();
            log.error("Attempt to join group {} failed due to: {}", this.groupId, exception.getMessage());
            requestFuture.raise(exception);
        }
    }

    public RequestFuture<Void> commitOffsets(Map<TopicPartition, Long> map, long j) {
        RequestFuture<Void> newCoordinatorRequestFuture = newCoordinatorRequestFuture(j);
        if (newCoordinatorRequestFuture.isDone()) {
            return newCoordinatorRequestFuture;
        }
        if (map.isEmpty()) {
            newCoordinatorRequestFuture.complete(null);
        } else {
            HashMap hashMap = new HashMap(map.size());
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                hashMap.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue().longValue(), ""));
            }
            OffsetCommitRequest offsetCommitRequest = new OffsetCommitRequest(this.groupId, this.generation, this.consumerId, -1L, hashMap);
            sendCoordinator(ApiKeys.OFFSET_COMMIT, offsetCommitRequest.toStruct(), new CommitOffsetCompletionHandler(map, newCoordinatorRequestFuture), j);
        }
        return newCoordinatorRequestFuture;
    }

    private <T> RequestFuture<T> newCoordinatorRequestFuture(long j) {
        if (coordinatorUnknown()) {
            return RequestFuture.newCoordinatorNeeded();
        }
        if (this.client.ready(this.consumerCoordinator, j)) {
            return new RequestFuture<>();
        }
        if (!this.client.connectionFailed(this.consumerCoordinator)) {
            return RequestFuture.pollNeeded();
        }
        coordinatorDead();
        return RequestFuture.newCoordinatorNeeded();
    }

    public RequestFuture<Map<TopicPartition, Long>> fetchOffsets(Set<TopicPartition> set, long j) {
        final RequestFuture<Map<TopicPartition, Long>> newCoordinatorRequestFuture = newCoordinatorRequestFuture(j);
        if (newCoordinatorRequestFuture.isDone()) {
            return newCoordinatorRequestFuture;
        }
        log.debug("Fetching committed offsets for partitions: " + Utils.join(set, ", "));
        sendCoordinator(ApiKeys.OFFSET_FETCH, new OffsetFetchRequest(this.groupId, new ArrayList(set)).toStruct(), new RequestCompletionHandler() { // from class: org.apache.kafka.clients.consumer.internals.Coordinator.2
            @Override // org.apache.kafka.clients.RequestCompletionHandler
            public void onComplete(ClientResponse clientResponse) {
                Coordinator.this.handleOffsetResponse(clientResponse, newCoordinatorRequestFuture);
            }
        }, j);
        return newCoordinatorRequestFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleOffsetResponse(ClientResponse clientResponse, RequestFuture<Map<TopicPartition, Long>> requestFuture) {
        if (clientResponse.wasDisconnected()) {
            handleCoordinatorDisconnect(clientResponse);
            requestFuture.retryWithNewCoordinator();
            return;
        }
        OffsetFetchResponse offsetFetchResponse = new OffsetFetchResponse(clientResponse.responseBody());
        HashMap hashMap = new HashMap(offsetFetchResponse.responseData().size());
        for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : offsetFetchResponse.responseData().entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetFetchResponse.PartitionData value = entry.getValue();
            if (value.hasError()) {
                log.debug("Error fetching offset for topic-partition {}: {}", key, Errors.forCode(value.errorCode).exception().getMessage());
                if (value.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
                    requestFuture.retryAfterBackoff();
                } else if (value.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                    coordinatorDead();
                    requestFuture.retryWithNewCoordinator();
                } else if (value.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                    log.debug("Unknown topic or partition for " + key);
                } else {
                    requestFuture.raise(new KafkaException("Unexpected error in fetch offset response: " + Errors.forCode(value.errorCode).exception().getMessage()));
                }
            } else if (value.offset >= 0) {
                hashMap.put(key, Long.valueOf(value.offset));
            } else {
                log.debug("No committed offset for partition " + key);
            }
        }
        if (requestFuture.isDone()) {
            return;
        }
        requestFuture.complete(hashMap);
    }

    public void maybeHeartbeat(long j) {
        if (this.heartbeat.shouldHeartbeat(j) && coordinatorReady(j)) {
            sendCoordinator(ApiKeys.HEARTBEAT, new HeartbeatRequest(this.groupId, this.generation, this.consumerId).toStruct(), new HeartbeatCompletionHandler(), j);
            this.heartbeat.sentHeartbeat(j);
        }
    }

    public long timeToNextHeartbeat(long j) {
        return this.heartbeat.timeToNextHeartbeat(j);
    }

    public boolean hasInFlightRequests() {
        return !coordinatorUnknown() && this.client.inFlightRequestCount(this.consumerCoordinator.idString()) > 0;
    }

    public boolean coordinatorUnknown() {
        return this.consumerCoordinator == null;
    }

    private boolean coordinatorReady(long j) {
        return !coordinatorUnknown() && this.client.ready(this.consumerCoordinator, j);
    }

    public RequestFuture<Void> discoverConsumerCoordinator() {
        long milliseconds = this.time.milliseconds();
        Node leastLoadedNode = this.client.leastLoadedNode(milliseconds);
        if (leastLoadedNode == null) {
            return RequestFuture.metadataRefreshNeeded();
        }
        if (!this.client.ready(leastLoadedNode, milliseconds)) {
            return this.client.connectionFailed(leastLoadedNode) ? RequestFuture.metadataRefreshNeeded() : RequestFuture.pollNeeded();
        }
        final RequestFuture<Void> requestFuture = new RequestFuture<>();
        log.debug("Issuing consumer metadata request to broker {}", Integer.valueOf(leastLoadedNode.id()));
        send(leastLoadedNode, ApiKeys.CONSUMER_METADATA, new ConsumerMetadataRequest(this.groupId).toStruct(), new RequestCompletionHandler() { // from class: org.apache.kafka.clients.consumer.internals.Coordinator.3
            @Override // org.apache.kafka.clients.RequestCompletionHandler
            public void onComplete(ClientResponse clientResponse) {
                Coordinator.this.handleConsumerMetadataResponse(clientResponse, requestFuture);
            }
        }, milliseconds);
        return requestFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleConsumerMetadataResponse(ClientResponse clientResponse, RequestFuture<Void> requestFuture) {
        log.debug("Consumer metadata response {}", clientResponse);
        if (clientResponse.wasDisconnected()) {
            requestFuture.retryAfterMetadataRefresh();
            return;
        }
        ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(clientResponse.responseBody());
        if (consumerMetadataResponse.errorCode() != Errors.NONE.code()) {
            requestFuture.retryAfterBackoff();
        } else {
            this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(), consumerMetadataResponse.node().host(), consumerMetadataResponse.node().port());
            requestFuture.complete(null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void coordinatorDead() {
        if (this.consumerCoordinator != null) {
            log.info("Marking the coordinator {} dead.", Integer.valueOf(this.consumerCoordinator.id()));
            this.consumerCoordinator = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCoordinatorDisconnect(ClientResponse clientResponse) {
        log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected", new Object[]{clientResponse.request(), Integer.valueOf(clientResponse.request().request().header().correlationId()), clientResponse.request().request().destination()});
        coordinatorDead();
    }

    private void sendCoordinator(ApiKeys apiKeys, Struct struct, RequestCompletionHandler requestCompletionHandler, long j) {
        send(this.consumerCoordinator, apiKeys, struct, requestCompletionHandler, j);
    }

    private void send(Node node, ApiKeys apiKeys, Struct struct, RequestCompletionHandler requestCompletionHandler, long j) {
        this.client.send(new ClientRequest(j, true, new RequestSend(node.idString(), this.client.nextRequestHeader(apiKeys), struct), requestCompletionHandler));
    }
}
