/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
public class KafkaConsumer<K, V>
implements Consumer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private static final long NO_CURRENT_THREAD = -1L;
    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private static final String JMX_PREFIX = "kafka.consumer";
    private String clientId;
    private ConsumerCoordinator coordinator;
    private Deserializer<K> keyDeserializer;
    private Deserializer<V> valueDeserializer;
    private Fetcher<K, V> fetcher;
    private Time time;
    private ConsumerNetworkClient client;
    private Metrics metrics;
    private SubscriptionState subscriptions;
    private Metadata metadata;
    private long retryBackoffMs;
    private long requestTimeoutMs;
    private boolean closed = false;
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refcount = new AtomicInteger(0);
    private final ConsumerConfig config;
    private boolean isStreams = false;
    private boolean isStreamsClosed = false;
    private Consumer<K, V> consumerDriver = null;
    private String defaultStream = null;

    public KafkaConsumer(Map<String, Object> configs) {
        this(configs, null, null);
    }

    public KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    public KafkaConsumer(Properties properties) {
        this(properties, (Deserializer<K>)null, (Deserializer<V>)null);
    }

    public KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer);
    }

    private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        log.debug("Starting the Kafka consumer");
        this.config = config;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.closed = false;
        this.isStreams = false;
        this.isStreamsClosed = false;
        if (keyDeserializer == null) {
            this.keyDeserializer = config.getConfiguredInstance("key.deserializer", Deserializer.class);
            this.keyDeserializer.configure(config.originals(), true);
        } else {
            config.ignore("key.deserializer");
            this.keyDeserializer = keyDeserializer;
        }
        if (valueDeserializer == null) {
            this.valueDeserializer = config.getConfiguredInstance("value.deserializer", Deserializer.class);
            this.valueDeserializer.configure(config.originals(), false);
        } else {
            config.ignore("value.deserializer");
            this.valueDeserializer = valueDeserializer;
        }
        this.defaultStream = null;
        try {
            this.defaultStream = config.getString("streams.consumer.default.stream");
            if (this.defaultStream == "") {
                this.defaultStream = null;
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (this.defaultStream != null) {
            this.initializeConsumer(this.defaultStream + ":");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeConsumer(String topic) {
        KafkaConsumer kafkaConsumer = this;
        synchronized (kafkaConsumer) {
            if (this.isStreamsClosed) {
                log.error("cannot initialize consumer. already closed.");
                return;
            }
            if (this.consumerDriver != null) {
                log.debug("initialized consumer already.");
                return;
            }
            if (topic.startsWith("/") || topic.contains(":")) {
                GenericHFactory consumerFactory = new GenericHFactory();
                Consumer ac = (Consumer)consumerFactory.getImplementorInstance("com.mapr.streams.impl.listener.MarlinListener", new Object[]{this.config, this.keyDeserializer, this.valueDeserializer}, ConsumerConfig.class, Deserializer.class, Deserializer.class);
                this.isStreams = true;
                this.consumerDriver = ac;
            } else {
                this.isStreams = false;
                this.consumerDriver = this;
                List<InetSocketAddress> kafkaaddresses = ClientUtils.parseAndValidateAddresses(this.config.getList("bootstrap.servers"));
                if (kafkaaddresses.size() == 0 || kafkaaddresses.get(0).equals("")) {
                    throw new KafkaException("Bootstrap servers not specified in configuration");
                }
                try {
                    log.debug("Starting the Kafka consumer");
                    this.requestTimeoutMs = this.config.getInt("request.timeout.ms").intValue();
                    int sessionTimeOutMs = this.config.getInt("session.timeout.ms");
                    int fetchMaxWaitMs = this.config.getInt("fetch.max.wait.ms");
                    if (this.requestTimeoutMs <= (long)sessionTimeOutMs || this.requestTimeoutMs <= (long)fetchMaxWaitMs) {
                        throw new ConfigException("request.timeout.ms should be greater than session.timeout.ms and fetch.max.wait.ms");
                    }
                    this.time = new SystemTime();
                    MetricConfig metricConfig = new MetricConfig().samples(this.config.getInt("metrics.num.samples")).timeWindow(this.config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS);
                    this.clientId = this.config.getString("client.id");
                    if (this.clientId.length() <= 0) {
                        this.clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
                    }
                    List<MetricsReporter> reporters = this.config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
                    reporters.add(new JmxReporter(JMX_PREFIX));
                    this.metrics = new Metrics(metricConfig, reporters, this.time);
                    this.retryBackoffMs = this.config.getLong("retry.backoff.ms");
                    this.metadata = new Metadata(this.retryBackoffMs, this.config.getLong("metadata.max.age.ms"));
                    List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(this.config.getList("bootstrap.servers"));
                    this.metadata.update(Cluster.bootstrap(addresses), 0L);
                    String metricGrpPrefix = "consumer";
                    LinkedHashMap<String, String> metricsTags = new LinkedHashMap<String, String>();
                    metricsTags.put("client-id", this.clientId);
                    ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(this.config.values());
                    NetworkClient netClient = new NetworkClient((Selectable)new Selector(this.config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, metricsTags, channelBuilder), this.metadata, this.clientId, 100, (long)this.config.getLong("reconnect.backoff.ms"), (int)this.config.getInt("send.buffer.bytes"), (int)this.config.getInt("receive.buffer.bytes"), (int)this.config.getInt("request.timeout.ms"), this.time);
                    this.client = new ConsumerNetworkClient(netClient, this.metadata, this.time, this.retryBackoffMs);
                    OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(this.config.getString("auto.offset.reset").toUpperCase());
                    this.subscriptions = new SubscriptionState(offsetResetStrategy);
                    List<PartitionAssignor> assignors = this.config.getConfiguredInstances("partition.assignment.strategy", PartitionAssignor.class);
                    this.coordinator = new ConsumerCoordinator(this.client, this.config.getString("group.id"), this.config.getInt("session.timeout.ms"), this.config.getInt("heartbeat.interval.ms"), assignors, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, metricsTags, this.time, this.retryBackoffMs, new ConsumerCoordinator.DefaultOffsetCommitCallback(), this.config.getBoolean("enable.auto.commit"), this.config.getLong("auto.commit.interval.ms"));
                    if (this.keyDeserializer == null) {
                        this.keyDeserializer = this.config.getConfiguredInstance("key.deserializer", Deserializer.class);
                        this.keyDeserializer.configure(this.config.originals(), true);
                    } else {
                        this.config.ignore("key.deserializer");
                        this.keyDeserializer = this.keyDeserializer;
                    }
                    if (this.valueDeserializer == null) {
                        this.valueDeserializer = this.config.getConfiguredInstance("value.deserializer", Deserializer.class);
                        this.valueDeserializer.configure(this.config.originals(), false);
                    } else {
                        this.config.ignore("value.deserializer");
                        this.valueDeserializer = this.valueDeserializer;
                    }
                    this.fetcher = new Fetcher<K, V>(this.client, this.config.getInt("fetch.min.bytes"), this.config.getInt("fetch.max.wait.ms"), this.config.getInt("max.partition.fetch.bytes"), this.config.getBoolean("check.crcs"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, metricsTags, this.time, this.retryBackoffMs);
                    this.config.logUnused();
                    AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId);
                    log.debug("Kafka consumer created");
                }
                catch (Throwable t) {
                    this.close(true);
                    throw new KafkaException("Failed to construct kafka consumer", t);
                }
            }
        }
    }

    private boolean useDefaultStreamName(String topicname) {
        return !topicname.startsWith("/");
    }

    private String addDefaultStreamNameToTopicName(String topicname) {
        return this.defaultStream + ":" + topicname;
    }

    private TopicPartition addDefaultStreamNameToTopicPartition(TopicPartition tp) {
        return new TopicPartition(this.addDefaultStreamNameToTopicName(tp.topic()), tp.partition());
    }

    private TopicPartition getNewTopicPartitionWithDefaultStream(TopicPartition tp) {
        if (this.defaultStream != null && this.useDefaultStreamName(tp.topic())) {
            return this.addDefaultStreamNameToTopicPartition(tp);
        }
        return tp;
    }

    private String getNewTopicNameWithDefaultStream(String topic) {
        if (this.defaultStream != null && this.useDefaultStreamName(topic)) {
            return this.addDefaultStreamNameToTopicName(topic);
        }
        return topic;
    }

    private boolean checkIfPartitionsNeedDefaultStream(Collection<TopicPartition> partitions) {
        boolean needDefault = false;
        if (this.defaultStream != null) {
            for (TopicPartition tp : partitions) {
                if (!this.useDefaultStreamName(tp.topic())) continue;
                needDefault = true;
                break;
            }
        }
        return needDefault;
    }

    private boolean checkIfTopicsNeedDefaultStream(Collection<String> topics) {
        boolean needDefault = false;
        if (this.defaultStream != null) {
            for (String topic : topics) {
                if (!this.useDefaultStreamName(topic)) continue;
                needDefault = true;
                break;
            }
        }
        return needDefault;
    }

    private List<String> getNewTopicListWithDefaultStream(List<String> topics) {
        if (this.checkIfTopicsNeedDefaultStream(topics)) {
            ArrayList<String> newTopics = new ArrayList<String>(topics.size());
            for (String topic : topics) {
                if (this.useDefaultStreamName(topic)) {
                    topic = this.addDefaultStreamNameToTopicName(topic);
                }
                newTopics.add(topic);
            }
            return newTopics;
        }
        return topics;
    }

    private List<TopicPartition> getNewPartitionListWithDefaultStream(List<TopicPartition> partitions) {
        if (this.checkIfPartitionsNeedDefaultStream(partitions)) {
            ArrayList<TopicPartition> newPartitions = new ArrayList<TopicPartition>(partitions.size());
            for (TopicPartition partition : partitions) {
                if (this.useDefaultStreamName(partition.topic())) {
                    partition = this.addDefaultStreamNameToTopicPartition(partition);
                }
                newPartitions.add(partition);
            }
            return newPartitions;
        }
        return partitions;
    }

    private Map<TopicPartition, OffsetAndMetadata> getNewPartitionMapWithDefaultStream(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (this.checkIfPartitionsNeedDefaultStream(offsets.keySet())) {
            HashMap<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                TopicPartition tp = entry.getKey();
                if (this.useDefaultStreamName(tp.topic())) {
                    tp = this.addDefaultStreamNameToTopicPartition(tp);
                }
                newOffsets.put(tp, entry.getValue());
            }
            return newOffsets;
        }
        return offsets;
    }

    private void updatePartitionArrayWithDefaultStream(TopicPartition ... partitions) {
        if (this.defaultStream != null) {
            for (int i = 0; i < partitions.length; ++i) {
                if (!this.useDefaultStreamName(partitions[i].topic())) continue;
                partitions[i] = this.addDefaultStreamNameToTopicPartition(partitions[i]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<TopicPartition> assignment() {
        if (this.consumerDriver == null) {
            return new HashSet<TopicPartition>();
        }
        if (this.isStreams) {
            return this.consumerDriver.assignment();
        }
        this.acquire();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(new HashSet<TopicPartition>(this.subscriptions.assignedPartitions()));
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<String> subscription() {
        if (this.consumerDriver == null) {
            return new HashSet<String>();
        }
        if (this.isStreams) {
            return this.consumerDriver.subscription();
        }
        this.acquire();
        try {
            Set<String> set = Collections.unmodifiableSet(new HashSet<String>(this.subscriptions.subscription()));
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(List<String> topics, ConsumerRebalanceListener listener) {
        if (topics.size() == 0) {
            log.debug("Subscribing to empty topics list");
            return;
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(topics.get(0));
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            topics = this.getNewTopicListWithDefaultStream(topics);
            this.consumerDriver.subscribe(topics, listener);
        } else {
            this.acquire();
            try {
                if (topics.isEmpty()) {
                    this.unsubscribe();
                } else {
                    log.debug("Subscribed to topic(s): {}", (Object)Utils.join(topics, ", "));
                    this.subscriptions.subscribe(topics, listener);
                    this.metadata.setTopics(this.subscriptions.groupSubscription());
                }
            }
            finally {
                this.release();
            }
        }
    }

    @Override
    public void subscribe(List<String> topics) {
        this.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(pattern.toString());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            pattern = Pattern.compile(this.getNewTopicNameWithDefaultStream(pattern.toString()));
            this.consumerDriver.subscribe(pattern, listener);
        } else {
            this.acquire();
            try {
                log.debug("Subscribed to pattern: {}", (Object)pattern);
                this.subscriptions.subscribe(pattern, listener);
                this.metadata.needMetadataForAllTopics(true);
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unsubscribe() {
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            this.consumerDriver.unsubscribe();
        } else {
            this.acquire();
            try {
                log.debug("Unsubscribed all topics or patterns and assigned partitions");
                this.subscriptions.unsubscribe();
                this.coordinator.maybeLeaveGroup();
                this.metadata.needMetadataForAllTopics(false);
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void assign(List<TopicPartition> partitions) {
        if (partitions.size() == 0) {
            log.debug("assigning empty partitions list");
            return;
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions.get(0).topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot subscribe");
            return;
        }
        if (this.isStreams) {
            partitions = this.getNewPartitionListWithDefaultStream(partitions);
            this.consumerDriver.assign(partitions);
        } else {
            this.acquire();
            try {
                log.debug("Subscribed to partition(s): {}", (Object)Utils.join(partitions, ", "));
                this.subscriptions.assignFromUser(partitions);
                HashSet<String> topics = new HashSet<String>();
                for (TopicPartition tp : partitions) {
                    topics.add(tp.topic());
                }
                this.metadata.setTopics(topics);
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        if (this.isStreams) {
            return this.consumerDriver.poll(timeout);
        }
        this.acquire();
        try {
            long elapsed;
            if (timeout < 0L) {
                throw new IllegalArgumentException("Timeout must not be negative");
            }
            long start = this.time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
                if ((records = this.pollOnce(remaining)).isEmpty()) continue;
                this.fetcher.initFetches(this.metadata.fetch());
                this.client.quickPoll();
                ConsumerRecords<K, V> consumerRecords = new ConsumerRecords<K, V>(records);
                return consumerRecords;
            } while ((remaining = timeout - (elapsed = this.time.milliseconds() - start)) > 0L);
            ConsumerRecords consumerRecords = ConsumerRecords.empty();
            return consumerRecords;
        }
        finally {
            this.release();
        }
    }

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        this.coordinator.ensureCoordinatorKnown();
        if (this.subscriptions.partitionsAutoAssigned()) {
            this.coordinator.ensurePartitionAssignment();
        }
        if (!this.subscriptions.hasAllFetchPositions()) {
            this.updateFetchPositions(this.subscriptions.missingFetchPositions());
        }
        Cluster cluster = this.metadata.fetch();
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }
        this.fetcher.initFetches(cluster);
        this.client.poll(timeout);
        return this.fetcher.fetchedRecords();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitSync() {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        if (this.isStreams) {
            this.consumerDriver.commitSync();
        } else {
            this.acquire();
            try {
                this.commitSync(this.subscriptions.allConsumed());
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.size() == 0) {
            log.debug("commitSync called with empty offsets");
            return;
        }
        if (this.consumerDriver == null) {
            Set<TopicPartition> partitions = offsets.keySet();
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot commit");
            return;
        }
        if (this.isStreams) {
            Map<TopicPartition, OffsetAndMetadata> newoffsets = this.getNewPartitionMapWithDefaultStream(offsets);
            this.consumerDriver.commitSync(newoffsets);
        } else {
            this.acquire();
            try {
                this.coordinator.commitOffsetsSync(offsets);
            }
            finally {
                this.release();
            }
        }
    }

    @Override
    public void commitAsync() {
        this.commitAsync(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        if (this.isStreams) {
            this.consumerDriver.commitAsync(callback);
        } else {
            this.acquire();
            try {
                this.commitAsync(this.subscriptions.allConsumed(), callback);
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        if (offsets.size() == 0) {
            log.debug("commitAsync with no offsets");
            callback.onComplete(offsets, null);
            return;
        }
        if (this.consumerDriver == null) {
            Set<TopicPartition> partitions = offsets.keySet();
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot commit");
            return;
        }
        if (this.isStreams) {
            Map<TopicPartition, OffsetAndMetadata> newOffsets = this.getNewPartitionMapWithDefaultStream(offsets);
            this.consumerDriver.commitAsync(newOffsets, callback);
        } else {
            this.acquire();
            try {
                log.debug("Committing offsets: {} ", offsets);
                this.coordinator.commitOffsetsAsync(new HashMap<TopicPartition, OffsetAndMetadata>(offsets), callback);
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, long offset) {
        if (offset < 0L) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot seek");
            return;
        }
        if (this.isStreams) {
            partition = this.getNewTopicPartitionWithDefaultStream(partition);
            this.consumerDriver.seek(partition, offset);
        } else {
            this.acquire();
            try {
                log.debug("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
                this.subscriptions.seek(partition, offset);
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToBeginning(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions[0].topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot seek");
            return;
        }
        if (this.isStreams) {
            this.updatePartitionArrayWithDefaultStream(partitions);
            this.consumerDriver.seekToBeginning(partitions);
        } else {
            this.acquire();
            try {
                Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() : Arrays.asList(partitions);
                for (TopicPartition tp : parts) {
                    log.debug("Seeking to beginning of partition {}", (Object)tp);
                    this.subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
                }
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToEnd(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions[0].topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot seek");
            return;
        }
        if (this.isStreams) {
            this.updatePartitionArrayWithDefaultStream(partitions);
            this.consumerDriver.seekToEnd(partitions);
        } else {
            this.acquire();
            try {
                Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() : Arrays.asList(partitions);
                for (TopicPartition tp : parts) {
                    log.debug("Seeking to end of partition {}", (Object)tp);
                    this.subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
                }
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long position(TopicPartition partition) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot get position");
            throw new NoOffsetForPartitionException(partition);
        }
        if (this.isStreams) {
            partition = this.getNewTopicPartitionWithDefaultStream(partition);
            return this.consumerDriver.position(partition);
        }
        this.acquire();
        try {
            if (!this.subscriptions.isAssigned(partition)) {
                throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
            }
            Long offset = this.subscriptions.position(partition);
            if (offset == null) {
                this.updateFetchPositions(Collections.singleton(partition));
                offset = this.subscriptions.position(partition);
            }
            long l = offset;
            return l;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public OffsetAndMetadata committed(TopicPartition partition) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot get committed");
            throw new NoOffsetForPartitionException(partition);
        }
        if (this.isStreams) {
            partition = this.getNewTopicPartitionWithDefaultStream(partition);
            return this.consumerDriver.committed(partition);
        }
        this.acquire();
        try {
            OffsetAndMetadata committed;
            if (this.subscriptions.isAssigned(partition)) {
                committed = this.subscriptions.committed(partition);
                if (committed == null) {
                    this.coordinator.refreshCommittedOffsetsIfNeeded();
                    committed = this.subscriptions.committed(partition);
                }
            } else {
                Map<TopicPartition, OffsetAndMetadata> offsets = this.coordinator.fetchCommittedOffsets(Collections.singleton(partition));
                committed = offsets.get(partition);
            }
            OffsetAndMetadata offsetAndMetadata = committed;
            return offsetAndMetadata;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        if (this.consumerDriver == null) {
            log.info("consumed not initialized, cannot get metrics");
            return null;
        }
        if (this.isStreams) {
            return this.consumerDriver.metrics();
        }
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(topic);
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot get partitionsFor " + topic);
            return null;
        }
        if (this.isStreams) {
            topic = this.getNewTopicNameWithDefaultStream(topic);
            return this.consumerDriver.partitionsFor(topic);
        }
        this.acquire();
        try {
            Cluster cluster = this.metadata.fetch();
            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
            if (parts != null) {
                List<PartitionInfo> list = parts;
                return list;
            }
            Map<String, List<PartitionInfo>> topicMetadata = this.fetcher.getTopicMetadata(Collections.singletonList(topic), this.requestTimeoutMs);
            List<PartitionInfo> list = topicMetadata.get(topic);
            return list;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        if (this.consumerDriver == null) {
            log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            if (this.defaultStream == null) {
                throw new KafkaException("Cannot get listTopics() without default stream name");
            }
            return this.consumerDriver.listTopics(this.defaultStream);
        }
        this.acquire();
        try {
            Map<String, List<PartitionInfo>> map = this.fetcher.getAllTopicMetadata(this.requestTimeoutMs);
            return map;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(String stream) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(stream + ":");
        }
        if (this.consumerDriver == null) {
            log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            return this.consumerDriver.listTopics(stream);
        }
        throw new KafkaException("Unsupported method for KafkaConsumer");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(pattern.toString() + ":");
        }
        if (this.consumerDriver == null) {
            log.info("consumer closed or not initialized, cannot listTopics");
            return new HashMap<String, List<PartitionInfo>>();
        }
        if (this.isStreams) {
            pattern = Pattern.compile(this.getNewTopicNameWithDefaultStream(pattern.toString()));
            return this.consumerDriver.listTopics(pattern);
        }
        throw new KafkaException("Unsupported method for KafkaConsumer");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void pause(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions[0].topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot pause");
            return;
        }
        if (this.isStreams) {
            this.updatePartitionArrayWithDefaultStream(partitions);
            this.consumerDriver.pause(partitions);
        } else {
            this.acquire();
            try {
                for (TopicPartition partition : partitions) {
                    log.debug("Pausing partition {}", (Object)partition);
                    this.subscriptions.pause(partition);
                }
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void resume(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions[0].topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot resume");
            return;
        }
        if (this.isStreams) {
            this.updatePartitionArrayWithDefaultStream(partitions);
            this.consumerDriver.resume(partitions);
        } else {
            this.acquire();
            try {
                for (TopicPartition partition : partitions) {
                    log.debug("Resuming partition {}", (Object)partition);
                    this.subscriptions.resume(partition);
                }
            }
            finally {
                this.release();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Consumer<K, V> consumerDriverToDelete = null;
        KafkaConsumer kafkaConsumer = this;
        synchronized (kafkaConsumer) {
            if (this.isStreamsClosed) {
                return;
            }
            this.isStreamsClosed = true;
            if (this.consumerDriver == null) {
                return;
            }
            consumerDriverToDelete = this.consumerDriver;
            this.consumerDriver = null;
        }
        if (this.isStreams) {
            consumerDriverToDelete.close();
        } else {
            this.acquire();
            try {
                if (this.closed) {
                    return;
                }
                this.close(false);
            }
            finally {
                this.release();
            }
        }
    }

    @Override
    public void wakeup() {
        if (this.consumerDriver == null) {
            log.info("consumed not initialized, cannot wakeup");
            return;
        }
        if (this.closed || this.isStreamsClosed) {
            log.error("Consumer closed, cannot wake up.");
            return;
        }
        if (this.isStreams) {
            this.consumerDriver.wakeup();
        } else {
            this.client.wakeup();
        }
    }

    private void close(boolean swallowException) {
        log.trace("Closing the Kafka consumer.");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.closed = true;
        ClientUtils.closeQuietly(this.coordinator, "coordinator", firstException);
        ClientUtils.closeQuietly(this.metrics, "consumer metrics", firstException);
        ClientUtils.closeQuietly(this.client, "consumer network client", firstException);
        ClientUtils.closeQuietly(this.keyDeserializer, "consumer key deserializer", firstException);
        ClientUtils.closeQuietly(this.valueDeserializer, "consumer value deserializer", firstException);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId);
        log.debug("The Kafka consumer has closed.");
        if (firstException.get() != null && !swallowException) {
            throw new KafkaException("Failed to close kafka consumer", firstException.get());
        }
    }

    private void updateFetchPositions(Set<TopicPartition> partitions) {
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        this.fetcher.updateFetchPositions(partitions);
    }

    private void ensureNotClosed() {
        if (this.closed) {
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        this.ensureNotClosed();
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }
}

