/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.internals.Heartbeat;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.ConsumerMetadataRequest;
import org.apache.kafka.common.requests.ConsumerMetadataResponse;
import org.apache.kafka.common.requests.HeartbeatRequest;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestSend;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Coordinator {
    private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
    private final KafkaClient client;
    private final Time time;
    private final String groupId;
    private final Heartbeat heartbeat;
    private final int sessionTimeoutMs;
    private final String assignmentStrategy;
    private final SubscriptionState subscriptions;
    private final CoordinatorMetrics sensors;
    private Node consumerCoordinator;
    private String consumerId;
    private int generation;

    public Coordinator(KafkaClient client, String groupId, int sessionTimeoutMs, String assignmentStrategy, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, Time time) {
        this.time = time;
        this.client = client;
        this.generation = -1;
        this.consumerId = "";
        this.groupId = groupId;
        this.consumerCoordinator = null;
        this.subscriptions = subscriptions;
        this.sessionTimeoutMs = sessionTimeoutMs;
        this.assignmentStrategy = assignmentStrategy;
        this.heartbeat = new Heartbeat(this.sessionTimeoutMs, time.milliseconds());
        this.sensors = new CoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
    }

    public RequestFuture<Void> assignPartitions(long now) {
        final RequestFuture<Void> future = this.newCoordinatorRequestFuture(now);
        if (future.isDone()) {
            return future;
        }
        ArrayList<String> subscribedTopics = new ArrayList<String>(this.subscriptions.subscribedTopics());
        log.debug("(Re-)joining group {} with subscribed topics {}", (Object)this.groupId, subscribedTopics);
        JoinGroupRequest request = new JoinGroupRequest(this.groupId, this.sessionTimeoutMs, subscribedTopics, this.consumerId, this.assignmentStrategy);
        log.debug("Issuing request ({}: {}) to coordinator {}", new Object[]{ApiKeys.JOIN_GROUP, request, this.consumerCoordinator.id()});
        RequestCompletionHandler completionHandler = new RequestCompletionHandler(){

            @Override
            public void onComplete(ClientResponse resp) {
                Coordinator.this.handleJoinResponse(resp, future);
            }
        };
        this.sendCoordinator(ApiKeys.JOIN_GROUP, request.toStruct(), completionHandler, now);
        return future;
    }

    private void handleJoinResponse(ClientResponse response, RequestFuture<Void> future) {
        if (response.wasDisconnected()) {
            this.handleCoordinatorDisconnect(response);
            future.retryWithNewCoordinator();
        } else {
            JoinGroupResponse joinResponse = new JoinGroupResponse(response.responseBody());
            short errorCode = joinResponse.errorCode();
            if (errorCode == Errors.NONE.code()) {
                this.consumerId = joinResponse.consumerId();
                this.generation = joinResponse.generationId();
                this.subscriptions.needRefreshCommits();
                log.debug("Joined group: {}", (Object)response);
                this.sensors.partitionReassignments.record(response.requestLatencyMs());
                this.subscriptions.changePartitionAssignment(joinResponse.assignedPartitions());
                future.complete(null);
            } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
                this.consumerId = "";
                log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.", (Object)this.groupId);
                future.retryNow();
            } else if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                this.coordinatorDead();
                log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", (Object)this.groupId);
                future.retryWithNewCoordinator();
            } else if (errorCode == Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code() || errorCode == Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code() || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()) {
                ApiException e = Errors.forCode(errorCode).exception();
                log.error("Attempt to join group {} failed due to: {}", (Object)this.groupId, (Object)e.getMessage());
                future.raise(e);
            } else {
                future.raise(new KafkaException("Unexpected error in join group response: " + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
            }
        }
    }

    public RequestFuture<Void> commitOffsets(Map<TopicPartition, Long> offsets, long now) {
        RequestFuture<Void> future = this.newCoordinatorRequestFuture(now);
        if (future.isDone()) {
            return future;
        }
        if (offsets.isEmpty()) {
            future.complete(null);
        } else {
            HashMap<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
                offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(entry.getValue(), ""));
            }
            OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.consumerId, -1L, offsetData);
            CommitOffsetCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, future);
            this.sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
        }
        return future;
    }

    private <T> RequestFuture<T> newCoordinatorRequestFuture(long now) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.newCoordinatorNeeded();
        }
        if (this.client.ready(this.consumerCoordinator, now)) {
            return new RequestFuture();
        }
        if (this.client.connectionFailed(this.consumerCoordinator)) {
            this.coordinatorDead();
            return RequestFuture.newCoordinatorNeeded();
        }
        return RequestFuture.pollNeeded();
    }

    public RequestFuture<Map<TopicPartition, Long>> fetchOffsets(Set<TopicPartition> partitions, long now) {
        final RequestFuture<Map<TopicPartition, Long>> future = this.newCoordinatorRequestFuture(now);
        if (future.isDone()) {
            return future;
        }
        log.debug("Fetching committed offsets for partitions: " + Utils.join(partitions, ", "));
        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
        RequestCompletionHandler completionHandler = new RequestCompletionHandler(){

            @Override
            public void onComplete(ClientResponse resp) {
                Coordinator.this.handleOffsetResponse(resp, future);
            }
        };
        this.sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now);
        return future;
    }

    private void handleOffsetResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
        if (resp.wasDisconnected()) {
            this.handleCoordinatorDisconnect(resp);
            future.retryWithNewCoordinator();
        } else {
            OffsetFetchResponse response = new OffsetFetchResponse(resp.responseBody());
            HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>(response.responseData().size());
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetFetchResponse.PartitionData data = entry.getValue();
                if (data.hasError()) {
                    log.debug("Error fetching offset for topic-partition {}: {}", (Object)tp, (Object)Errors.forCode(data.errorCode).exception().getMessage());
                    if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
                        future.retryAfterBackoff();
                        continue;
                    }
                    if (data.errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                        this.coordinatorDead();
                        future.retryWithNewCoordinator();
                        continue;
                    }
                    if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                        log.debug("Unknown topic or partition for " + tp);
                        continue;
                    }
                    future.raise(new KafkaException("Unexpected error in fetch offset response: " + Errors.forCode(data.errorCode).exception().getMessage()));
                    continue;
                }
                if (data.offset >= 0L) {
                    offsets.put(tp, data.offset);
                    continue;
                }
                log.debug("No committed offset for partition " + tp);
            }
            if (!future.isDone()) {
                future.complete(offsets);
            }
        }
    }

    public void maybeHeartbeat(long now) {
        if (this.heartbeat.shouldHeartbeat(now) && this.coordinatorReady(now)) {
            HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.consumerId);
            this.sendCoordinator(ApiKeys.HEARTBEAT, req.toStruct(), new HeartbeatCompletionHandler(), now);
            this.heartbeat.sentHeartbeat(now);
        }
    }

    public long timeToNextHeartbeat(long now) {
        return this.heartbeat.timeToNextHeartbeat(now);
    }

    public boolean hasInFlightRequests() {
        return !this.coordinatorUnknown() && this.client.inFlightRequestCount(this.consumerCoordinator.idString()) > 0;
    }

    public boolean coordinatorUnknown() {
        return this.consumerCoordinator == null;
    }

    private boolean coordinatorReady(long now) {
        return !this.coordinatorUnknown() && this.client.ready(this.consumerCoordinator, now);
    }

    public RequestFuture<Void> discoverConsumerCoordinator() {
        long now = this.time.milliseconds();
        Node node = this.client.leastLoadedNode(now);
        if (node == null) {
            return RequestFuture.metadataRefreshNeeded();
        }
        if (!this.client.ready(node, now)) {
            if (this.client.connectionFailed(node)) {
                return RequestFuture.metadataRefreshNeeded();
            }
            return RequestFuture.pollNeeded();
        }
        final RequestFuture<Void> future = new RequestFuture<Void>();
        log.debug("Issuing consumer metadata request to broker {}", (Object)node.id());
        ConsumerMetadataRequest metadataRequest = new ConsumerMetadataRequest(this.groupId);
        RequestCompletionHandler completionHandler = new RequestCompletionHandler(){

            @Override
            public void onComplete(ClientResponse resp) {
                Coordinator.this.handleConsumerMetadataResponse(resp, future);
            }
        };
        this.send(node, ApiKeys.CONSUMER_METADATA, metadataRequest.toStruct(), completionHandler, now);
        return future;
    }

    private void handleConsumerMetadataResponse(ClientResponse resp, RequestFuture<Void> future) {
        log.debug("Consumer metadata response {}", (Object)resp);
        if (resp.wasDisconnected()) {
            future.retryAfterMetadataRefresh();
        } else {
            ConsumerMetadataResponse consumerMetadataResponse = new ConsumerMetadataResponse(resp.responseBody());
            if (consumerMetadataResponse.errorCode() == Errors.NONE.code()) {
                this.consumerCoordinator = new Node(Integer.MAX_VALUE - consumerMetadataResponse.node().id(), consumerMetadataResponse.node().host(), consumerMetadataResponse.node().port());
                future.complete(null);
            } else {
                future.retryAfterBackoff();
            }
        }
    }

    private void coordinatorDead() {
        if (this.consumerCoordinator != null) {
            log.info("Marking the coordinator {} dead.", (Object)this.consumerCoordinator.id());
            this.consumerCoordinator = null;
        }
    }

    private void handleCoordinatorDisconnect(ClientResponse response) {
        int correlation = response.request().request().header().correlationId();
        log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected", new Object[]{response.request(), correlation, response.request().request().destination()});
        this.coordinatorDead();
    }

    private void sendCoordinator(ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
        this.send(this.consumerCoordinator, api, request, handler, now);
    }

    private void send(Node node, ApiKeys api, Struct request, RequestCompletionHandler handler, long now) {
        RequestHeader header = this.client.nextRequestHeader(api);
        RequestSend send = new RequestSend(node.idString(), header, request);
        this.client.send(new ClientRequest(now, true, send, handler));
    }

    private class CoordinatorMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor commitLatency;
        public final Sensor heartbeatLatency;
        public final Sensor partitionReassignments;

        public CoordinatorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> tags) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.commitLatency = metrics.sensor("commit-latency");
            this.commitLatency.add(new MetricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request", tags), new Avg());
            this.commitLatency.add(new MetricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request", tags), new Max());
            this.commitLatency.add(new MetricName("commit-rate", this.metricGrpName, "The number of commit calls per second", tags), new Rate(new Count()));
            this.heartbeatLatency = metrics.sensor("heartbeat-latency");
            this.heartbeatLatency.add(new MetricName("heartbeat-response-time-max", this.metricGrpName, "The max time taken to receive a response to a hearbeat request", tags), new Max());
            this.heartbeatLatency.add(new MetricName("heartbeat-rate", this.metricGrpName, "The average number of heartbeats per second", tags), new Rate(new Count()));
            this.partitionReassignments = metrics.sensor("reassignment-latency");
            this.partitionReassignments.add(new MetricName("reassignment-time-avg", this.metricGrpName, "The average time taken for a partition reassignment", tags), new Avg());
            this.partitionReassignments.add(new MetricName("reassignment-time-max", this.metricGrpName, "The max time taken for a partition reassignment", tags), new Avg());
            this.partitionReassignments.add(new MetricName("reassignment-rate", this.metricGrpName, "The number of partition reassignments per second", tags), new Rate(new Count()));
            Measurable numParts = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return Coordinator.this.subscriptions.assignedPartitions().size();
                }
            };
            metrics.addMetric(new MetricName("assigned-partitions", this.metricGrpName, "The number of partitions currently assigned to this consumer", tags), numParts);
            Measurable lastHeartbeat = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return TimeUnit.SECONDS.convert(now - Coordinator.this.heartbeat.lastHeartbeatSend(), TimeUnit.MILLISECONDS);
                }
            };
            metrics.addMetric(new MetricName("last-heartbeat-seconds-ago", this.metricGrpName, "The number of seconds since the last controller heartbeat", tags), lastHeartbeat);
        }
    }

    private class CommitOffsetCompletionHandler
    implements RequestCompletionHandler {
        private final Map<TopicPartition, Long> offsets;
        private final RequestFuture<Void> future;

        public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
            this.offsets = offsets;
            this.future = future;
        }

        @Override
        public void onComplete(ClientResponse resp) {
            if (resp.wasDisconnected()) {
                Coordinator.this.handleCoordinatorDisconnect(resp);
                this.future.retryWithNewCoordinator();
            } else {
                OffsetCommitResponse commitResponse = new OffsetCommitResponse(resp.responseBody());
                for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                    TopicPartition tp = entry.getKey();
                    short errorCode = entry.getValue();
                    long offset = this.offsets.get(tp);
                    if (errorCode == Errors.NONE.code()) {
                        log.debug("Committed offset {} for partition {}", (Object)offset, (Object)tp);
                        Coordinator.this.subscriptions.committed(tp, offset);
                        continue;
                    }
                    if (errorCode == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                        Coordinator.this.coordinatorDead();
                        this.future.retryWithNewCoordinator();
                        continue;
                    }
                    this.future.retryAfterBackoff();
                    log.error("Error committing partition {} at offset {}: {}", new Object[]{tp, offset, Errors.forCode(errorCode).exception().getMessage()});
                }
                if (!this.future.isDone()) {
                    this.future.complete(null);
                }
            }
            ((Coordinator)Coordinator.this).sensors.commitLatency.record(resp.requestLatencyMs());
        }
    }

    private class HeartbeatCompletionHandler
    implements RequestCompletionHandler {
        private HeartbeatCompletionHandler() {
        }

        @Override
        public void onComplete(ClientResponse resp) {
            if (resp.wasDisconnected()) {
                Coordinator.this.handleCoordinatorDisconnect(resp);
            } else {
                HeartbeatResponse response = new HeartbeatResponse(resp.responseBody());
                if (response.errorCode() == Errors.NONE.code()) {
                    log.debug("Received successful heartbeat response.");
                } else if (response.errorCode() == Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code() || response.errorCode() == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                    log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
                    Coordinator.this.coordinatorDead();
                } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) {
                    log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
                    Coordinator.this.subscriptions.needReassignment();
                } else if (response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) {
                    log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
                    Coordinator.this.consumerId = "";
                    Coordinator.this.subscriptions.needReassignment();
                } else {
                    throw new KafkaException("Unexpected error in heartbeat response: " + Errors.forCode(response.errorCode()).exception().getMessage());
                }
            }
            ((Coordinator)Coordinator.this).sensors.heartbeatLatency.record(resp.requestLatencyMs());
        }
    }
}

