/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignorAdapter;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.clients.mapr.util.MapRTopicUtils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class KafkaConsumer<K, V>
implements Consumer<K, V> {
    private static final String CLIENT_ID_METRIC_TAG = "client-id";
    private static final long NO_CURRENT_THREAD = -1L;
    private static final String JMX_PREFIX = "kafka.consumer";
    static final long DEFAULT_CLOSE_TIMEOUT_MS = 30000L;
    Metrics metrics;
    KafkaConsumerMetrics kafkaConsumerMetrics;
    private Logger log;
    private String clientId;
    private Optional<String> groupId;
    private ConsumerCoordinator coordinator;
    private Deserializer<K> keyDeserializer;
    private Deserializer<V> valueDeserializer;
    private Fetcher<K, V> fetcher;
    private ConsumerInterceptors<K, V> interceptors;
    private Time time;
    private ConsumerNetworkClient client;
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private long retryBackoffMs;
    private long requestTimeoutMs;
    private int defaultApiTimeoutMs;
    private volatile boolean closed = false;
    private List<ConsumerPartitionAssignor> assignors;
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refcount = new AtomicInteger(0);
    private final LogContext logContext;
    private final ConsumerConfig config;
    private boolean isStreams = false;
    private boolean isStreamsClosed = false;
    private Consumer<K, V> consumerDriver = null;
    private String defaultStream = null;
    private ConsumerGroupMetadata groupMetadataPlug;
    private boolean cachedSubscriptionHashAllFetchPositions;

    public KafkaConsumer(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    public KafkaConsumer(Properties properties) {
        this(properties, (Deserializer<K>)null, (Deserializer<V>)null);
    }

    public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER);
        this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
        this.clientId = config.getString("client.id");
        this.logContext = groupRebalanceConfig.groupInstanceId.isPresent() ? new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + ", clientId=" + this.clientId + ", groupId=" + this.groupId.orElse("null") + "] ") : new LogContext("[Consumer clientId=" + this.clientId + ", groupId=" + this.groupId.orElse("null") + "] ");
        this.log = this.logContext.logger(this.getClass());
        this.groupId.ifPresent(groupIdStr -> {
            if (groupIdStr.isEmpty()) {
                this.log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
            }
        });
        this.log.debug("Starting the Kafka consumer");
        this.config = config;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.closed = false;
        this.isStreams = false;
        this.isStreamsClosed = false;
        if (keyDeserializer == null) {
            this.keyDeserializer = config.getConfiguredInstance("key.deserializer", Deserializer.class);
            this.keyDeserializer.configure(config.originals(), true);
        } else {
            config.ignore("key.deserializer");
            this.keyDeserializer = keyDeserializer;
        }
        if (valueDeserializer == null) {
            this.valueDeserializer = config.getConfiguredInstance("value.deserializer", Deserializer.class);
            this.valueDeserializer.configure(config.originals(), false);
        } else {
            config.ignore("value.deserializer");
            this.valueDeserializer = valueDeserializer;
        }
        this.defaultStream = null;
        try {
            this.defaultStream = config.getString("streams.consumer.default.stream");
            if (this.defaultStream.equals("")) {
                this.defaultStream = null;
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (this.defaultStream != null) {
            this.initializeConsumer(this.defaultStream + ":");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeConsumer(String topic) {
        KafkaConsumer kafkaConsumer = this;
        synchronized (kafkaConsumer) {
            if (this.isStreamsClosed) {
                this.log.error("cannot initialize consumer. already closed.");
                return;
            }
            if (this.consumerDriver != null) {
                this.log.debug("initialized consumer already.");
                return;
            }
            Map<String, Object> userProvidedConfigs = this.config.originals();
            userProvidedConfigs.put("client.id", this.clientId);
            List interceptorList = new ConsumerConfig(userProvidedConfigs, false).getConfiguredInstances("interceptor.classes", ConsumerInterceptor.class);
            this.interceptors = new ConsumerInterceptors(interceptorList);
            this.time = Time.SYSTEM;
            if (topic.startsWith("/") || topic.contains(":")) {
                try {
                    Class.forName("com.mapr.kafka.eventstreams.impl.MarlinClient");
                }
                catch (Throwable e) {
                    throw new RuntimeException(String.format("Error occurred while instantiating class, com.mapr.kafka.eventstreams.impl.MarlinClient. " + e.getMessage(), new Object[0]), e);
                }
                GenericHFactory consumerFactory = new GenericHFactory();
                Consumer ac = (Consumer)consumerFactory.getImplementorInstance("com.mapr.kafka.eventstreams.impl.listener.MarlinListenerV10", new Object[]{this.config, this.keyDeserializer, this.valueDeserializer, this.interceptors}, ConsumerConfig.class, Deserializer.class, Deserializer.class, ConsumerInterceptors.class);
                this.isStreams = true;
                this.consumerDriver = ac;
                OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(this.config.getString("auto.offset.reset").toUpperCase(Locale.ROOT));
                this.subscriptions = new SubscriptionState(this.logContext, offsetResetStrategy);
            } else {
                this.isStreams = false;
                this.consumerDriver = this;
                try {
                    boolean enableAutoCommit = this.config.maybeOverrideEnableAutoCommit();
                    this.log.debug("Initializing the Kafka consumer");
                    this.requestTimeoutMs = this.config.getInt("request.timeout.ms").intValue();
                    this.defaultApiTimeoutMs = this.config.getInt("default.api.timeout.ms");
                    this.time = Time.SYSTEM;
                    this.metrics = KafkaConsumer.buildMetrics(this.config, this.time, this.clientId);
                    this.retryBackoffMs = this.config.getLong("retry.backoff.ms");
                    if (this.keyDeserializer == null) {
                        this.keyDeserializer = this.config.getConfiguredInstance("key.deserializer", Deserializer.class);
                        this.keyDeserializer.configure(this.config.originals(), true);
                    } else {
                        this.config.ignore("key.deserializer");
                        this.keyDeserializer = this.keyDeserializer;
                    }
                    if (this.valueDeserializer == null) {
                        this.valueDeserializer = this.config.getConfiguredInstance("value.deserializer", Deserializer.class);
                        this.valueDeserializer.configure(this.config.originals(), false);
                    } else {
                        this.config.ignore("value.deserializer");
                        this.valueDeserializer = this.valueDeserializer;
                    }
                    OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(this.config.getString("auto.offset.reset").toUpperCase(Locale.ROOT));
                    this.subscriptions = new SubscriptionState(this.logContext, offsetResetStrategy);
                    ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(this.keyDeserializer, this.valueDeserializer, this.metrics.reporters(), interceptorList);
                    this.metadata = new ConsumerMetadata(this.retryBackoffMs, this.config.getLong("metadata.max.age.ms"), this.config.getBoolean("exclude.internal.topics") == false, this.config.getBoolean("allow.auto.create.topics"), this.subscriptions, this.logContext, clusterResourceListeners);
                    List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(this.config.getList("bootstrap.servers"), this.config.getString("client.dns.lookup"));
                    this.metadata.bootstrap(addresses);
                    String metricGrpPrefix = "consumer";
                    FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), metricGrpPrefix);
                    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.config, this.time, this.logContext);
                    IsolationLevel isolationLevel = IsolationLevel.valueOf(this.config.getString("isolation.level").toUpperCase(Locale.ROOT));
                    Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(this.metrics, metricsRegistry);
                    int heartbeatIntervalMs = this.config.getInt("heartbeat.interval.ms");
                    ApiVersions apiVersions = new ApiVersions();
                    NetworkClient netClient = new NetworkClient(new Selector(this.config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, channelBuilder, this.logContext), this.metadata, this.clientId, 100, this.config.getLong("reconnect.backoff.ms"), this.config.getLong("reconnect.backoff.max.ms"), this.config.getInt("send.buffer.bytes"), this.config.getInt("receive.buffer.bytes"), this.config.getInt("request.timeout.ms"), ClientDnsLookup.forConfig(this.config.getString("client.dns.lookup")), this.time, true, apiVersions, throttleTimeSensor, this.logContext);
                    this.client = new ConsumerNetworkClient(this.logContext, netClient, this.metadata, this.time, this.retryBackoffMs, this.config.getInt("request.timeout.ms"), heartbeatIntervalMs);
                    this.assignors = PartitionAssignorAdapter.getAssignorInstances(this.config.getList("partition.assignment.strategy"), this.config.originals());
                    this.coordinator = !this.groupId.isPresent() ? null : new ConsumerCoordinator(new GroupRebalanceConfig(this.config, GroupRebalanceConfig.ProtocolType.CONSUMER), this.logContext, this.client, this.assignors, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, this.time, enableAutoCommit, this.config.getLong("auto.commit.interval.ms"), this.interceptors, this.config.getBoolean("internal.throw.on.fetch.stable.offset.unsupported"));
                    this.fetcher = new Fetcher<K, V>(this.logContext, this.client, this.config.getInt("fetch.min.bytes"), this.config.getInt("fetch.max.bytes"), this.config.getInt("fetch.max.wait.ms"), this.config.getInt("max.partition.fetch.bytes"), this.config.getInt("max.poll.records"), this.config.getBoolean("check.crcs"), this.config.getString("client.rack"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, metricsRegistry, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel, apiVersions);
                    this.kafkaConsumerMetrics = new KafkaConsumerMetrics(this.metrics, metricGrpPrefix);
                    this.config.logUnused();
                    AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId, this.metrics, this.time.milliseconds());
                    this.log.debug("Kafka consumer initialized");
                }
                catch (Throwable t) {
                    if (this.log != null) {
                        this.close(0L, true);
                    }
                    throw new KafkaException("Failed to construct kafka consumer", t);
                }
            }
        }
    }

    KafkaConsumer(LogContext logContext, String clientId, ConsumerCoordinator coordinator, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, Fetcher<K, V> fetcher, ConsumerInterceptors<K, V> interceptors, Time time, ConsumerNetworkClient client, Metrics metrics, SubscriptionState subscriptions, ConsumerMetadata metadata, long retryBackoffMs, long requestTimeoutMs, int defaultApiTimeoutMs, List<ConsumerPartitionAssignor> assignors, String groupId) {
        this.isStreams = false;
        this.consumerDriver = this;
        this.config = null;
        this.logContext = logContext;
        this.log = logContext.logger(this.getClass());
        this.clientId = clientId;
        this.coordinator = coordinator;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.fetcher = fetcher;
        this.interceptors = Objects.requireNonNull(interceptors);
        this.time = time;
        this.client = client;
        this.metrics = metrics;
        this.subscriptions = subscriptions;
        this.metadata = metadata;
        this.retryBackoffMs = retryBackoffMs;
        this.requestTimeoutMs = requestTimeoutMs;
        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
        this.assignors = assignors;
        this.groupId = Optional.ofNullable(groupId);
        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
    }

    private static Metrics buildMetrics(ConsumerConfig config, Time time, String clientId) {
        Map<String, String> metricsTags = Collections.singletonMap(CLIENT_ID_METRIC_TAG, clientId);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricsTags);
        List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", clientId));
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.configure(config.originals());
        reporters.add(jmxReporter);
        KafkaMetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX, config.originalsWithPrefix("metrics.context."));
        return new Metrics(metricConfig, reporters, time, metricsContext);
    }

    private boolean useDefaultStreamName(String topicname) {
        return !topicname.startsWith("/");
    }

    private String addDefaultStreamNameToTopicName(String topicname) {
        return this.defaultStream + ":" + topicname;
    }

    private TopicPartition addDefaultStreamNameToTopicPartition(TopicPartition tp) {
        return new TopicPartition(this.addDefaultStreamNameToTopicName(tp.topic()), tp.partition());
    }

    private Set<TopicPartition> getNewTopicPartitionWithDefaultStream(Set<TopicPartition> tp) {
        return tp.stream().map(this::getNewTopicPartitionWithDefaultStream).collect(Collectors.toSet());
    }

    private TopicPartition getNewTopicPartitionWithDefaultStream(TopicPartition tp) {
        if (this.defaultStream != null && this.useDefaultStreamName(tp.topic())) {
            return this.addDefaultStreamNameToTopicPartition(tp);
        }
        return tp;
    }

    private String getNewTopicNameWithDefaultStream(String topic) {
        if (this.defaultStream != null && this.useDefaultStreamName(topic)) {
            return this.addDefaultStreamNameToTopicName(topic);
        }
        return topic;
    }

    private boolean checkIfPartitionsNeedDefaultStream(Collection<TopicPartition> partitions) {
        boolean needDefault = false;
        if (this.defaultStream != null) {
            for (TopicPartition tp : partitions) {
                if (!this.useDefaultStreamName(tp.topic())) continue;
                needDefault = true;
                break;
            }
        }
        return needDefault;
    }

    private boolean checkIfTopicsNeedDefaultStream(Collection<String> topics) {
        boolean needDefault = false;
        if (this.defaultStream != null) {
            for (String topic : topics) {
                if (!this.useDefaultStreamName(topic)) continue;
                needDefault = true;
                break;
            }
        }
        return needDefault;
    }

    private Collection<TopicPartition> getNewPartitionCollectionWithDefaultStream(Collection<TopicPartition> partitions) {
        if (this.checkIfPartitionsNeedDefaultStream(partitions)) {
            ArrayList<TopicPartition> newPartitions = new ArrayList<TopicPartition>(partitions.size());
            for (TopicPartition partition : partitions) {
                if (this.useDefaultStreamName(partition.topic())) {
                    partition = this.addDefaultStreamNameToTopicPartition(partition);
                }
                newPartitions.add(partition);
            }
            return newPartitions;
        }
        return partitions;
    }

    private Map<TopicPartition, ? extends Object> getNewPartitionMapWithDefaultStream(Map<TopicPartition, ? extends Object> partitions) {
        if (this.checkIfPartitionsNeedDefaultStream(partitions.keySet())) {
            HashMap<TopicPartition, Object> newPartitions = new HashMap<TopicPartition, Object>();
            for (Map.Entry<TopicPartition, ? extends Object> entry : partitions.entrySet()) {
                TopicPartition tp = entry.getKey();
                if (this.useDefaultStreamName(tp.topic())) {
                    tp = this.addDefaultStreamNameToTopicPartition(tp);
                }
                newPartitions.put(tp, entry.getValue());
            }
            return newPartitions;
        }
        return partitions;
    }

    private Collection<String> getNewTopicCollectionWithDefaultStream(Collection<String> topics) {
        if (this.checkIfTopicsNeedDefaultStream(topics)) {
            ArrayList<String> newTopics = new ArrayList<String>(topics.size());
            for (String topic : topics) {
                if (this.useDefaultStreamName(topic)) {
                    topic = this.addDefaultStreamNameToTopicName(topic);
                }
                newTopics.add(topic);
            }
            return newTopics;
        }
        return topics;
    }

    @Override
    public Set<TopicPartition> assignment() {
        if (this.consumerDriver == null) {
            return new HashSet<TopicPartition>();
        }
        if (this.isStreams) {
            return this.consumerDriver.assignment();
        }
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Set<String> subscription() {
        if (this.consumerDriver == null) {
            return new HashSet<String>();
        }
        if (this.isStreams) {
            return this.consumerDriver.subscription();
        }
        this.acquireAndEnsureOpen();
        try {
            Set<String> set = Collections.unmodifiableSet(new HashSet<String>(this.subscriptions.subscription()));
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        if (topics == null) {
            throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
        }
        if (topics.isEmpty()) {
            this.unsubscribe();
            return;
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(topics.iterator().next());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                if (!this.subscriptions.assignedPartitions().isEmpty()) {
                    this.log.error("Consumer was not unsubscribed from assigned patitions before subscribe");
                    throw new IllegalStateException("Subscription to topics and assigning to partitions and pattern are mutually exclusive");
                }
                topics = this.getNewTopicCollectionWithDefaultStream(topics);
                this.consumerDriver.subscribe(topics, listener);
                this.subscriptions.subscribe(new HashSet<String>(topics), listener);
            }
            finally {
                this.release();
            }
        }
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            for (String topic : topics) {
                if (topic != null && !topic.trim().isEmpty()) continue;
                throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
            }
            this.throwIfNoAssignorsConfigured();
            this.fetcher.clearBufferedDataForUnassignedTopics(topics);
            this.log.info("Subscribed to topic(s): {}", (Object)Utils.join(topics, ", "));
            if (this.subscriptions.subscribe(new HashSet<String>(topics), listener)) {
                this.metadata.requestUpdateForNewTopics();
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
        this.subscribe((Collection<String>)topics, listener);
    }

    @Override
    public void subscribe(Collection<String> topics) {
        this.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    @Override
    public void subscribe(List<String> topics) {
        this.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(pattern.toString());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                if (!this.subscriptions.assignedPartitions().isEmpty()) {
                    this.log.error("Consumer was not unsubscribed from assigned patitions before subscribe");
                    throw new IllegalStateException("Subscription to topics and assigning to partitions and pattern are mutually exclusive");
                }
                pattern = Pattern.compile(this.getNewTopicNameWithDefaultStream(pattern.toString()));
                this.consumerDriver.subscribe(pattern, listener);
                this.subscriptions.subscribe(pattern, listener);
            }
            finally {
                this.release();
            }
        }
        this.maybeThrowInvalidGroupIdException();
        if (pattern == null || pattern.toString().equals("")) {
            throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty"));
        }
        this.acquireAndEnsureOpen();
        try {
            this.throwIfNoAssignorsConfigured();
            this.log.info("Subscribed to pattern: '{}'", (Object)pattern);
            this.subscriptions.subscribe(pattern, listener);
            this.coordinator.updatePatternSubscription(this.metadata.fetch());
            this.metadata.requestUpdateForNewTopics();
        }
        finally {
            this.release();
        }
    }

    @Override
    public void subscribe(Pattern pattern) {
        this.subscribe(pattern, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    @Override
    public void unsubscribe() {
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                this.consumerDriver.unsubscribe();
                this.subscriptions.unsubscribe();
            }
            finally {
                this.release();
            }
        }
        this.acquireAndEnsureOpen();
        try {
            this.fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
            if (this.coordinator != null) {
                this.coordinator.onLeavePrepare();
                this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics");
            }
            this.subscriptions.unsubscribe();
            this.log.info("Unsubscribed all topics or patterns and assigned partitions");
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void assign(Collection<TopicPartition> partitions) {
        if (partitions == null) {
            throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
        }
        if (partitions.isEmpty()) {
            this.unsubscribe();
            return;
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                if (this.subscriptions.hasPatternSubscription() || !this.subscriptions.subscription().isEmpty()) {
                    this.log.error("Consumer was not unsubscribed before assign");
                    throw new IllegalStateException("Subscription to topics and assigning to partitions and pattern are mutually exclusive");
                }
                partitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
                this.consumerDriver.assign(partitions);
                this.subscriptions.assignFromUser(new HashSet<TopicPartition>(partitions));
            }
            finally {
                this.release();
            }
        }
        this.acquireAndEnsureOpen();
        try {
            for (TopicPartition tp : partitions) {
                String topic;
                String string = topic = tp != null ? tp.topic() : null;
                if (topic != null && !topic.trim().isEmpty()) continue;
                throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
            }
            this.fetcher.clearBufferedDataForUnassignedPartitions(partitions);
            if (this.coordinator != null) {
                this.coordinator.maybeAutoCommitOffsetsAsync(this.time.milliseconds());
            }
            this.log.info("Subscribed to partition(s): {}", (Object)Utils.join(partitions, ", "));
            if (this.subscriptions.assignFromUser(new HashSet<TopicPartition>(partitions))) {
                this.metadata.requestUpdateForNewTopics();
            }
        }
        finally {
            this.release();
        }
    }

    @Override
    public void assign(List<TopicPartition> partitions) {
        this.assign((Collection<TopicPartition>)partitions);
    }

    @Override
    @Deprecated
    public ConsumerRecords<K, V> poll(long timeoutMs) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        if (this.isStreams) {
            ConsumerRecords<K, V> records = this.consumerDriver.poll(timeoutMs);
            if (this.interceptors != null) {
                records = this.interceptors.onConsume(records);
            }
            return records;
        }
        return this.poll(this.time.timer(timeoutMs), false);
    }

    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        if (this.isStreams) {
            return this.poll(timeout.toMillis());
        }
        return this.poll(this.time.timer(timeout), true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) {
        this.acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            do {
                this.client.maybeTriggerWakeup();
                if (includeMetadataInTimeout) {
                    this.updateAssignmentMetadataIfNeeded(timer, false);
                } else {
                    while (!this.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE), true)) {
                        this.log.warn("Still waiting for metadata");
                    }
                }
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.pollForFetches(timer);
                if (records.isEmpty()) continue;
                if (this.fetcher.sendFetches() > 0 || this.client.hasPendingRequests()) {
                    this.client.transmitSends();
                }
                ConsumerRecords<K, V> consumerRecords = this.interceptors.onConsume(new ConsumerRecords<K, V>(records));
                return consumerRecords;
            } while (timer.notExpired());
            ConsumerRecords consumerRecords = ConsumerRecords.empty();
            return consumerRecords;
        }
        finally {
            this.release();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
        }
    }

    boolean updateAssignmentMetadataIfNeeded(Timer timer, boolean waitForJoinGroup) {
        if (this.coordinator != null && !this.coordinator.poll(timer, waitForJoinGroup)) {
            return false;
        }
        return this.updateFetchPositions(timer);
    }

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
        long pollTimeout = this.coordinator == null ? timer.remainingMs() : Math.min(this.coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }
        this.fetcher.sendFetches();
        if (!this.cachedSubscriptionHashAllFetchPositions && pollTimeout > this.retryBackoffMs) {
            pollTimeout = this.retryBackoffMs;
        }
        this.log.trace("Polling for fetches with timeout {}", (Object)pollTimeout);
        Timer pollTimer = this.time.timer(pollTimeout);
        this.client.poll(pollTimer, () -> !this.fetcher.hasAvailableFetches());
        timer.update(pollTimer.currentTimeMs());
        return this.fetcher.fetchedRecords();
    }

    @Override
    public void commitSync() {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        if (this.isStreams) {
            this.consumerDriver.commitSync();
        } else {
            this.commitSync(Duration.ofMillis(this.defaultApiTimeoutMs));
        }
    }

    @Override
    public void commitSync(Duration timeout) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        if (this.isStreams) {
            this.consumerDriver.commitSync(timeout);
        } else {
            this.acquireAndEnsureOpen();
            try {
                this.maybeThrowInvalidGroupIdException();
                if (!this.coordinator.commitOffsetsSync(this.subscriptions.allConsumed(), this.time.timer(timeout))) {
                    throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully committing the current consumed offsets");
                }
            }
            finally {
                this.release();
            }
        }
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.size() == 0) {
            this.log.debug("commitSync called with empty offsets");
            return;
        }
        if (this.consumerDriver == null) {
            Set<TopicPartition> partitions = offsets.keySet();
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot commit");
            return;
        }
        if (this.isStreams) {
            Map<TopicPartition, Object> newoffsets = this.getNewPartitionMapWithDefaultStream(offsets);
            this.consumerDriver.commitSync(newoffsets);
        } else {
            this.commitSync(offsets, Duration.ofMillis(this.defaultApiTimeoutMs));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
        if (offsets.size() == 0) {
            this.log.debug("commitSync called with empty offsets");
            return;
        }
        if (this.consumerDriver == null) {
            Set<TopicPartition> partitions = offsets.keySet();
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot commit");
            return;
        }
        if (this.isStreams) {
            Map<TopicPartition, Object> newoffsets = this.getNewPartitionMapWithDefaultStream(offsets);
            this.consumerDriver.commitSync(newoffsets);
        } else {
            this.acquireAndEnsureOpen();
            try {
                this.maybeThrowInvalidGroupIdException();
                offsets.forEach(this::updateLastSeenEpochIfNewer);
                if (!this.coordinator.commitOffsetsSync(new HashMap<TopicPartition, OffsetAndMetadata>(offsets), this.time.timer(timeout))) {
                    throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully committing offsets " + offsets);
                }
            }
            finally {
                this.release();
            }
        }
    }

    @Override
    public void commitAsync() {
        this.commitAsync(null);
    }

    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        if (this.isStreams) {
            this.consumerDriver.commitAsync(callback);
        } else {
            this.commitAsync(this.subscriptions.allConsumed(), callback);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.maybeThrowInvalidGroupIdException();
        if (offsets.size() == 0) {
            this.log.debug("commitAsync with no offsets");
            if (callback != null) {
                callback.onComplete(offsets, null);
            }
            return;
        }
        if (this.consumerDriver == null) {
            Set<TopicPartition> partitions = offsets.keySet();
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot commit");
            return;
        }
        if (this.isStreams) {
            Map<TopicPartition, Object> newOffsets = this.getNewPartitionMapWithDefaultStream(offsets);
            this.consumerDriver.commitAsync(newOffsets, callback);
        } else {
            this.acquireAndEnsureOpen();
            try {
                this.log.debug("Committing offsets: {}", offsets);
                offsets.forEach(this::updateLastSeenEpochIfNewer);
                this.coordinator.commitOffsetsAsync(new HashMap<TopicPartition, OffsetAndMetadata>(offsets), callback);
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, long offset) {
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot seek");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                partition = this.getNewTopicPartitionWithDefaultStream(partition);
                if (!this.isTopicPartitionAssignedOrSubscribed(partition)) {
                    this.log.error("Partition {} is not assigned", (Object)partition);
                    throw new IllegalStateException(String.format("No current assignment for partition %s-%d", partition.topic(), partition.partition()));
                }
                this.consumerDriver.seek(partition, offset);
            }
            finally {
                this.release();
            }
        }
        this.acquireAndEnsureOpen();
        try {
            if (!this.isTopicPartitionAssignedOrSubscribed(partition)) {
                this.log.error("Partition {} is not assigned", (Object)partition);
                throw new IllegalStateException(String.format("No current assignment for partition %s-%d", partition.topic(), partition.partition()));
            }
            this.log.info("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(offset, Optional.empty(), this.metadata.currentLeader(partition));
            this.subscriptions.seekUnvalidated(partition, newPosition);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
        long offset = offsetAndMetadata.offset();
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot seek");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                partition = this.getNewTopicPartitionWithDefaultStream(partition);
                if (!this.isTopicPartitionAssignedOrSubscribed(partition)) {
                    this.log.error("Partition {} is not assigned", (Object)partition);
                    throw new IllegalStateException(String.format("No current assignment for partition %s-%d", partition.topic(), partition.partition()));
                }
                this.consumerDriver.seek(partition, offsetAndMetadata);
            }
            finally {
                this.release();
            }
        }
        this.acquireAndEnsureOpen();
        try {
            if (offsetAndMetadata.leaderEpoch().isPresent()) {
                this.log.info("Seeking to offset {} for partition {} with epoch {}", new Object[]{offset, partition, offsetAndMetadata.leaderEpoch().get()});
            } else {
                this.log.info("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            }
            Metadata.LeaderAndEpoch currentLeaderAndEpoch = this.metadata.currentLeader(partition);
            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), currentLeaderAndEpoch);
            this.updateLastSeenEpochIfNewer(partition, offsetAndMetadata);
            this.subscriptions.seekUnvalidated(partition, newPosition);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) {
        if (partitions == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot seek");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                partitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
                for (TopicPartition tp : partitions) {
                    if (this.isTopicPartitionAssignedOrSubscribed(tp)) continue;
                    this.log.error("Partition {} is not assigned", (Object)tp);
                    throw new IllegalStateException(String.format("No current assignment for partition %s-%d", tp.topic(), tp.partition()));
                }
                this.consumerDriver.seekToBeginning(partitions);
            }
            finally {
                this.release();
            }
        }
        this.acquireAndEnsureOpen();
        try {
            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
            this.subscriptions.requestOffsetReset(parts, OffsetResetStrategy.EARLIEST);
        }
        finally {
            this.release();
        }
    }

    @Override
    @Deprecated
    public void seekToBeginning(TopicPartition ... partitions) {
        this.seekToBeginning(Arrays.asList(partitions));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        if (partitions == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot seek");
            return;
        }
        if (this.isStreams) {
            this.acquireAndEnsureOpen();
            try {
                partitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
                for (TopicPartition tp : partitions) {
                    if (this.isTopicPartitionAssignedOrSubscribed(tp)) continue;
                    this.log.error("Partition {} is not assigned", (Object)tp);
                    throw new IllegalStateException(String.format("No current assignment for partition %s-%d", tp.topic(), tp.partition()));
                }
                this.consumerDriver.seekToEnd(partitions);
            }
            finally {
                this.release();
            }
        }
        this.acquireAndEnsureOpen();
        try {
            Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
            this.subscriptions.requestOffsetReset(parts, OffsetResetStrategy.LATEST);
        }
        finally {
            this.release();
        }
    }

    @Override
    @Deprecated
    public void seekToEnd(TopicPartition ... partitions) {
        this.seekToEnd(Arrays.asList(partitions));
    }

    @Override
    public long position(TopicPartition partition) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get position");
            throw new NoOffsetForPartitionException(partition);
        }
        if (this.isStreams) {
            partition = this.getNewTopicPartitionWithDefaultStream(partition);
            return this.consumerDriver.position(partition);
        }
        return this.position(partition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public long position(TopicPartition partition, Duration timeout) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get position");
            throw new NoOffsetForPartitionException(partition);
        }
        if (this.isStreams) {
            partition = this.getNewTopicPartitionWithDefaultStream(partition);
            return this.consumerDriver.position(partition, timeout);
        }
        this.acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(partition)) {
                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
            }
            Timer timer = this.time.timer(timeout);
            do {
                SubscriptionState.FetchPosition position;
                if ((position = this.subscriptions.validPosition(partition)) != null) {
                    long l = position.offset;
                    return l;
                }
                this.updateFetchPositions(timer);
                this.client.poll(timer);
            } while (timer.notExpired());
            throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position for partition " + partition + " could be determined");
        }
        finally {
            this.release();
        }
    }

    @Override
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition partition) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get committed");
            throw new NoOffsetForPartitionException(partition);
        }
        if (this.isStreams) {
            partition = this.getNewTopicPartitionWithDefaultStream(partition);
            return this.consumerDriver.committed(partition);
        }
        return this.committed(partition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
        return this.committed(Collections.singleton(partition), timeout).get(partition);
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
        return this.committed(partitions, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get committed");
            throw new NoOffsetForPartitionException(partitions);
        }
        if (this.isStreams) {
            partitions = this.getNewTopicPartitionWithDefaultStream(partitions);
            return this.consumerDriver.committed(partitions, timeout);
        }
        this.acquireAndEnsureOpen();
        try {
            this.maybeThrowInvalidGroupIdException();
            Map<TopicPartition, OffsetAndMetadata> offsets = this.coordinator.fetchCommittedOffsets(partitions, this.time.timer(timeout));
            if (offsets == null) {
                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms larger to relax the threshold.");
            }
            offsets.forEach(this::updateLastSeenEpochIfNewer);
            Map<TopicPartition, OffsetAndMetadata> map = offsets;
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        if (this.consumerDriver == null) {
            this.log.info("consumed not initialized, cannot get metrics");
            return Collections.EMPTY_MAP;
        }
        if (this.isStreams) {
            return this.consumerDriver.metrics();
        }
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(topic);
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get partitionsFor " + topic);
            return null;
        }
        if (this.isStreams) {
            topic = this.getNewTopicNameWithDefaultStream(topic);
            return this.consumerDriver.partitionsFor(topic);
        }
        return this.partitionsFor(topic, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(topic);
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get partitionsFor " + topic);
            return null;
        }
        if (this.isStreams) {
            topic = this.getNewTopicNameWithDefaultStream(topic);
            return this.consumerDriver.partitionsFor(topic, timeout);
        }
        this.acquireAndEnsureOpen();
        try {
            Cluster cluster = this.metadata.fetch();
            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
            if (!parts.isEmpty()) {
                List<PartitionInfo> list = parts;
                return list;
            }
            Timer timer = this.time.timer(timeout);
            Map<String, List<PartitionInfo>> topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(topic), this.metadata.allowAutoTopicCreation()), timer);
            List<PartitionInfo> list = topicMetadata.get(topic);
            return list;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        if (this.consumerDriver == null) {
            this.log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            if (this.defaultStream == null) {
                throw new KafkaException("Cannot get listTopics() without default stream name");
            }
            return this.consumerDriver.listTopics(this.defaultStream);
        }
        return this.listTopics(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        if (this.consumerDriver == null) {
            this.log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            if (this.defaultStream == null) {
                throw new KafkaException("Cannot get listTopics() without default stream name");
            }
            return this.consumerDriver.listTopics(this.defaultStream, timeout);
        }
        this.acquireAndEnsureOpen();
        try {
            Map<String, List<PartitionInfo>> map = this.fetcher.getAllTopicMetadata(this.time.timer(timeout));
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(String stream) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(stream + ":");
        }
        if (this.consumerDriver == null) {
            this.log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            return this.consumerDriver.listTopics(stream);
        }
        throw new KafkaException("Unsupported method for KafkaConsumer");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(String stream, Duration timeout) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(stream + ":");
        }
        if (this.consumerDriver == null) {
            this.log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            return this.consumerDriver.listTopics(stream, timeout);
        }
        throw new KafkaException("Unsupported method for KafkaConsumer");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(pattern.toString() + ":");
        }
        if (this.consumerDriver == null) {
            this.log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            pattern = Pattern.compile(this.getNewTopicNameWithDefaultStream(pattern.toString()));
            return this.consumerDriver.listTopics(pattern);
        }
        throw new KafkaException("Unsupported method for KafkaConsumer");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pause(Collection<TopicPartition> partitions) {
        if (this.consumerDriver == null) {
            Iterator<TopicPartition> iterator = partitions.iterator();
            if (iterator.hasNext()) {
                this.initializeConsumer(iterator.next().topic());
            } else {
                this.log.debug("No partitions to pause.");
                return;
            }
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot pause");
            return;
        }
        if (this.isStreams) {
            partitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
            this.consumerDriver.pause(partitions);
        } else {
            this.acquireAndEnsureOpen();
            try {
                this.log.debug("Pausing partitions {}", partitions);
                for (TopicPartition partition : partitions) {
                    this.subscriptions.pause(partition);
                }
            }
            finally {
                this.release();
            }
        }
    }

    @Override
    @Deprecated
    public void pause(TopicPartition ... partitions) {
        this.pause(Arrays.asList(partitions));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume(Collection<TopicPartition> partitions) {
        if (partitions.size() == 0) {
            this.log.debug("resuming empty partitions list");
            return;
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot resume");
            return;
        }
        if (this.isStreams) {
            partitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
            this.consumerDriver.resume(partitions);
        } else {
            this.acquireAndEnsureOpen();
            try {
                this.log.debug("Resuming partitions {}", partitions);
                for (TopicPartition partition : partitions) {
                    this.subscriptions.resume(partition);
                }
            }
            finally {
                this.release();
            }
        }
    }

    @Override
    @Deprecated
    public void resume(TopicPartition ... partitions) {
        this.resume(Arrays.asList(partitions));
    }

    @Override
    public Set<TopicPartition> paused() {
        if (this.consumerDriver == null) {
            return new HashSet<TopicPartition>();
        }
        if (this.isStreams) {
            return this.consumerDriver.paused();
        }
        this.acquireAndEnsureOpen();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.pausedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        if (timestampsToSearch.isEmpty()) {
            return Collections.emptyMap();
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(timestampsToSearch.keySet().iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get offsetsForTimes");
            return new HashMap<TopicPartition, OffsetAndTimestamp>();
        }
        if (this.isStreams) {
            Map<TopicPartition, Object> newTimestampsToSearch = this.getNewPartitionMapWithDefaultStream(timestampsToSearch);
            return this.consumerDriver.offsetsForTimes(newTimestampsToSearch);
        }
        return this.offsetsForTimes(timestampsToSearch, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        if (timestampsToSearch.isEmpty()) {
            return Collections.emptyMap();
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(timestampsToSearch.keySet().iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get offsetsForTimes");
            return new HashMap<TopicPartition, OffsetAndTimestamp>();
        }
        if (this.isStreams) {
            Map<TopicPartition, Object> newTimestampsToSearch = this.getNewPartitionMapWithDefaultStream(timestampsToSearch);
            return this.consumerDriver.offsetsForTimes(newTimestampsToSearch, timeout);
        }
        this.acquireAndEnsureOpen();
        try {
            for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
                if (entry.getValue() >= 0L) continue;
                throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative.");
            }
            Map<TopicPartition, OffsetAndTimestamp> map = this.fetcher.offsetsForTimes(timestampsToSearch, this.time.timer(timeout));
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        if (partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get endOffsets");
            return new HashMap<TopicPartition, Long>();
        }
        if (this.isStreams) {
            Collection<TopicPartition> newPartitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
            return this.consumerDriver.beginningOffsets(newPartitions);
        }
        return this.beginningOffsets(partitions, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        if (partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get endOffsets");
            return new HashMap<TopicPartition, Long>();
        }
        if (this.isStreams) {
            Collection<TopicPartition> newPartitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
            return this.consumerDriver.beginningOffsets(newPartitions, timeout);
        }
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> map = this.fetcher.beginningOffsets(partitions, this.time.timer(timeout));
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        if (partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get endOffsets");
            return new HashMap<TopicPartition, Long>();
        }
        if (this.isStreams) {
            Collection<TopicPartition> newPartitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
            return this.consumerDriver.endOffsets(newPartitions);
        }
        return this.endOffsets(partitions, Duration.ofMillis(this.requestTimeoutMs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        if (partitions.isEmpty()) {
            return Collections.emptyMap();
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            this.log.error("consumer closed, cannot get endOffsets");
            return new HashMap<TopicPartition, Long>();
        }
        if (this.isStreams) {
            Collection<TopicPartition> newPartitions = this.getNewPartitionCollectionWithDefaultStream(partitions);
            return this.consumerDriver.endOffsets(newPartitions, timeout);
        }
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> map = this.fetcher.endOffsets(partitions, this.time.timer(timeout));
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public ConsumerGroupMetadata groupMetadata() {
        if (this.consumerDriver == null) {
            if (this.groupMetadataPlug == null) {
                this.maybeThrowInvalidGroupIdException();
                this.groupMetadataPlug = new ConsumerGroupMetadata(this.groupId.get());
            }
            return this.groupMetadataPlug;
        }
        if (this.isStreams) {
            return this.consumerDriver.groupMetadata();
        }
        return this.coordinator.groupMetadata();
    }

    @Override
    public void enforceRebalance() {
        this.acquireAndEnsureOpen();
        if (this.isStreams) {
            this.consumerDriver.enforceRebalance();
        } else {
            try {
                if (this.coordinator == null) {
                    throw new IllegalStateException("Tried to force a rebalance but consumer does not have a group.");
                }
                this.coordinator.requestRejoin();
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (this.isStreams) {
            Consumer<K, V> consumerDriverToDelete = null;
            KafkaConsumer kafkaConsumer = this;
            synchronized (kafkaConsumer) {
                if (this.isStreamsClosed) {
                    return;
                }
                this.isStreamsClosed = true;
                if (this.consumerDriver == null) {
                    return;
                }
                consumerDriverToDelete = this.consumerDriver;
                this.consumerDriver = null;
            }
            consumerDriverToDelete.close();
        } else {
            this.close(Duration.ofMillis(30000L));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Deprecated
    public void close(long timeout, TimeUnit timeUnit) {
        if (timeout < 0L) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        if (this.isStreams) {
            Consumer<K, V> consumerDriverToDelete = null;
            KafkaConsumer kafkaConsumer = this;
            synchronized (kafkaConsumer) {
                if (this.isStreamsClosed) {
                    return;
                }
                this.isStreamsClosed = true;
                if (this.consumerDriver == null) {
                    return;
                }
                consumerDriverToDelete = this.consumerDriver;
                this.consumerDriver = null;
            }
            consumerDriverToDelete.close(timeout, timeUnit);
        } else {
            this.close(Duration.ofMillis(timeUnit.toMillis(timeout)));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(Duration timeout) {
        if (timeout.toMillis() < 0L) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        if (this.isStreams) {
            Consumer<K, V> consumerDriverToDelete = null;
            KafkaConsumer kafkaConsumer = this;
            synchronized (kafkaConsumer) {
                if (this.isStreamsClosed) {
                    return;
                }
                this.isStreamsClosed = true;
                if (this.consumerDriver == null) {
                    return;
                }
                consumerDriverToDelete = this.consumerDriver;
                this.consumerDriver = null;
            }
            consumerDriverToDelete.close(timeout);
        } else {
            this.acquire();
            try {
                if (!this.closed) {
                    this.close(timeout.toMillis(), false);
                }
            }
            finally {
                this.closed = true;
                this.release();
            }
        }
    }

    @Override
    public void wakeup() {
        if (this.consumerDriver == null) {
            this.log.info("consumed not initialized, cannot wakeup");
            return;
        }
        if (this.closed || this.isStreamsClosed) {
            this.log.error("Consumer closed, cannot wake up.");
            return;
        }
        if (this.isStreams) {
            this.consumerDriver.wakeup();
        } else {
            this.client.wakeup();
        }
    }

    private ClusterResourceListeners configureClusterResourceListeners(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<?> ... candidateLists) {
        ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
        for (List<?> candidateList : candidateLists) {
            clusterResourceListeners.maybeAddAll(candidateList);
        }
        clusterResourceListeners.maybeAdd(keyDeserializer);
        clusterResourceListeners.maybeAdd(valueDeserializer);
        return clusterResourceListeners;
    }

    private void close(long timeoutMs, boolean swallowException) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        try {
            if (this.coordinator != null) {
                this.coordinator.close(this.time.timer(Math.min(timeoutMs, this.requestTimeoutMs)));
            }
        }
        catch (Throwable t) {
            firstException.compareAndSet(null, t);
            this.log.error("Failed to close coordinator", t);
        }
        Utils.closeQuietly(this.fetcher, "fetcher", firstException);
        Utils.closeQuietly(this.interceptors, "consumer interceptors", firstException);
        Utils.closeQuietly(this.kafkaConsumerMetrics, "kafka consumer metrics", firstException);
        Utils.closeQuietly(this.metrics, "consumer metrics", firstException);
        Utils.closeQuietly(this.client, "consumer network client", firstException);
        Utils.closeQuietly(this.keyDeserializer, "consumer key deserializer", firstException);
        Utils.closeQuietly(this.valueDeserializer, "consumer value deserializer", firstException);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId, this.metrics);
        this.log.debug("Kafka consumer has been closed");
        Throwable exception = firstException.get();
        if (exception != null && !swallowException) {
            if (exception instanceof InterruptException) {
                throw (InterruptException)exception;
            }
            throw new KafkaException("Failed to close kafka consumer", exception);
        }
    }

    private boolean updateFetchPositions(Timer timer) {
        this.fetcher.validateOffsetsIfNeeded();
        this.cachedSubscriptionHashAllFetchPositions = this.subscriptions.hasAllFetchPositions();
        if (this.cachedSubscriptionHashAllFetchPositions) {
            return true;
        }
        if (this.coordinator != null && !this.coordinator.refreshCommittedOffsetsIfNeeded(timer)) {
            return false;
        }
        this.subscriptions.resetInitializingPositions();
        this.fetcher.resetOffsetsIfNeeded();
        return true;
    }

    private void acquireAndEnsureOpen() {
        this.acquire();
        if (this.closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    private void throwIfNoAssignorsConfigured() {
        if (this.assignors.isEmpty()) {
            throw new IllegalStateException("Must configure at least one partition assigner class name to partition.assignment.strategy configuration property");
        }
    }

    private void maybeThrowInvalidGroupIdException() {
        if (!this.groupId.isPresent()) {
            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.");
        }
    }

    private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        if (offsetAndMetadata != null) {
            offsetAndMetadata.leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(topicPartition, (int)epoch));
        }
    }

    private boolean isTopicPartitionAssignedOrSubscribed(TopicPartition topicPartition) {
        if (this.isAssigned(topicPartition)) {
            return true;
        }
        for (String clusterName : MapRTopicUtils.listClusterNames()) {
            if (!this.isAssigned(new TopicPartition("/mapr/" + clusterName + topicPartition.topic(), topicPartition.partition()))) continue;
            return true;
        }
        return false;
    }

    private boolean isAssigned(TopicPartition topicPartition) {
        String topic = topicPartition.topic();
        return this.subscriptions.isAssigned(topicPartition) || this.subscriptions.subscription().contains(topic) || this.subscriptions.matchesSubscribedPattern(topic);
    }

    String getClientId() {
        return this.clientId;
    }

    boolean updateAssignmentMetadataIfNeeded(Timer timer) {
        return this.updateAssignmentMetadataIfNeeded(timer, true);
    }
}

