/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.CommitType;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerWakeupException;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.Coordinator;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MapRKafkaConsumer<K, V>
implements Consumer<K, V> {
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final AtomicReference<Long> currentThread = new AtomicReference();
    private int refcount = 0;
    private long requestTimeoutMs = 5000L;
    private static final Logger log = LoggerFactory.getLogger(MapRKafkaConsumer.class);
    private static final long EARLIEST_OFFSET_TIMESTAMP = -2L;
    private static final long LATEST_OFFSET_TIMESTAMP = -1L;
    private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
    private final Coordinator coordinator;
    private final Fetcher<K, V> fetcher;
    private final Time time;
    private final NetworkClient client;
    private final Metrics metrics;
    private final SubscriptionState subscriptions;
    private final Metadata metadata;
    private final long retryBackoffMs;
    private final boolean autoCommit;
    private final long autoCommitIntervalMs;
    private final ConsumerRebalanceCallback rebalanceCallback;
    private long lastCommitAttemptMs;
    private boolean closed = false;
    private final AtomicBoolean wakeup = new AtomicBoolean(false);

    public MapRKafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        try {
            log.debug("Starting the Kafka consumer");
            this.rebalanceCallback = callback == null ? config.getConfiguredInstance("rebalance.callback.class", ConsumerRebalanceCallback.class) : callback;
            this.time = new SystemTime();
            this.autoCommit = config.getBoolean("enable.auto.commit");
            this.autoCommitIntervalMs = config.getLong("auto.commit.interval.ms");
            this.lastCommitAttemptMs = this.time.milliseconds();
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS);
            String clientId = config.getString("client.id");
            String jmxPrefix = "kafka.consumer";
            if (clientId.length() <= 0) {
                clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
            }
            List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
            reporters.add(new JmxReporter(jmxPrefix));
            this.metrics = new Metrics(metricConfig, reporters, this.time);
            this.retryBackoffMs = config.getLong("retry.backoff.ms");
            this.metadata = new Metadata(this.retryBackoffMs, config.getLong("metadata.max.age.ms"));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"));
            this.metadata.update(Cluster.bootstrap(addresses), 0L);
            String metricGrpPrefix = "consumer";
            LinkedHashMap<String, String> metricsTags = new LinkedHashMap<String, String>();
            metricsTags.put("client-id", clientId);
            this.client = new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, metricsTags), this.metadata, clientId, 100, config.getLong("reconnect.backoff.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"));
            OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase());
            this.subscriptions = new SubscriptionState(offsetResetStrategy);
            this.coordinator = new Coordinator(this.client, config.getString("group.id"), config.getInt("session.timeout.ms"), config.getString("partition.assignment.strategy"), this.subscriptions, this.metrics, metricGrpPrefix, metricsTags, this.time);
            if (keyDeserializer == null) {
                this.keyDeserializer = config.getConfiguredInstance("key.deserializer", Deserializer.class);
                this.keyDeserializer.configure(config.originals(), true);
            } else {
                this.keyDeserializer = keyDeserializer;
            }
            if (valueDeserializer == null) {
                this.valueDeserializer = config.getConfiguredInstance("value.deserializer", Deserializer.class);
                this.valueDeserializer.configure(config.originals(), false);
            } else {
                this.valueDeserializer = valueDeserializer;
            }
            this.fetcher = new Fetcher<K, V>(this.client, config.getInt("fetch.min.bytes"), config.getInt("fetch.max.wait.ms"), config.getInt("max.partition.fetch.bytes"), config.getBoolean("check.crcs"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, metricsTags, this.time);
            config.logUnused();
            log.debug("Kafka consumer created");
        }
        catch (Throwable t) {
            this.close(true);
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<TopicPartition> subscriptions() {
        this.acquire();
        try {
            Set<TopicPartition> set = Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
            return set;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(String ... topics) {
        this.acquire();
        try {
            log.debug("Subscribed to topic(s): {}", (Object)Utils.join(topics, ", "));
            for (String topic : topics) {
                this.subscriptions.subscribe(topic);
            }
            this.metadata.addTopics(topics);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(TopicPartition ... partitions) {
        this.acquire();
        try {
            log.debug("Subscribed to partitions(s): {}", (Object)Utils.join(partitions, ", "));
            for (TopicPartition tp : partitions) {
                this.subscriptions.subscribe(tp);
                this.metadata.addTopics(tp.topic());
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unsubscribe(String ... topics) {
        this.acquire();
        try {
            log.debug("Unsubscribed from topic(s): {}", (Object)Utils.join(topics, ", "));
            for (String topic : topics) {
                this.subscriptions.unsubscribe(topic);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unsubscribe(TopicPartition ... partitions) {
        this.acquire();
        try {
            log.debug("Unsubscribed from partitions(s): {}", (Object)Utils.join(partitions, ", "));
            for (TopicPartition partition : partitions) {
                this.subscriptions.unsubscribe(partition);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        this.acquire();
        try {
            if (timeout < 0L) {
                throw new IllegalArgumentException("Timeout must not be negative");
            }
            long remaining = timeout;
            while (remaining >= 0L) {
                long start = this.time.milliseconds();
                long pollTimeout = Utils.min(remaining, this.timeToNextCommit(start), this.coordinator.timeToNextHeartbeat(start));
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = this.pollOnce(pollTimeout, start);
                long end = this.time.milliseconds();
                if (!records.isEmpty()) {
                    this.fetcher.initFetches(this.metadata.fetch(), end);
                    this.pollClient(0L, end);
                    ConsumerRecords<K, V> consumerRecords = new ConsumerRecords<K, V>(records);
                    return consumerRecords;
                }
                if ((remaining -= end - start) <= 0L) continue;
                Utils.sleep(Utils.min(remaining, this.retryBackoffMs));
                remaining -= this.time.milliseconds() - end;
            }
            ConsumerRecords consumerRecords = ConsumerRecords.empty();
            return consumerRecords;
        }
        finally {
            this.release();
        }
    }

    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout, long now) {
        Cluster cluster = this.metadata.fetch();
        if (this.subscriptions.partitionsAutoAssigned()) {
            if (this.subscriptions.partitionAssignmentNeeded()) {
                this.reassignPartitions(now);
            } else {
                this.coordinator.maybeHeartbeat(now);
            }
        }
        if (!this.subscriptions.hasAllFetchPositions()) {
            this.updateFetchPositions(this.subscriptions.missingFetchPositions());
        }
        if (this.shouldAutoCommit(now)) {
            this.commit(CommitType.ASYNC);
        }
        this.fetcher.initFetches(cluster, now);
        this.pollClient(timeout, now);
        return this.fetcher.fetchedRecords();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
        this.acquire();
        try {
            log.debug("Committing offsets ({}): {} ", (Object)commitType.toString().toLowerCase(), offsets);
            this.lastCommitAttemptMs = this.time.milliseconds();
            if (commitType == CommitType.ASYNC) {
                this.subscriptions.needRefreshCommits();
            }
            this.commitOffsets(offsets, commitType);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit(CommitType commitType) {
        this.acquire();
        try {
            this.commit(this.subscriptions.allConsumed(), commitType);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(TopicPartition partition, long offset) {
        this.acquire();
        try {
            log.debug("Seeking to offset {} for partition {}", (Object)offset, (Object)partition);
            this.subscriptions.seek(partition, offset);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToBeginning(TopicPartition ... partitions) {
        this.acquire();
        try {
            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() : Arrays.asList(partitions);
            for (TopicPartition tp : parts) {
                this.subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seekToEnd(TopicPartition ... partitions) {
        this.acquire();
        try {
            Collection<TopicPartition> parts = partitions.length == 0 ? this.subscriptions.assignedPartitions() : Arrays.asList(partitions);
            for (TopicPartition tp : parts) {
                this.subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
            }
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long position(TopicPartition partition) {
        this.acquire();
        try {
            if (!this.subscriptions.assignedPartitions().contains(partition)) {
                throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
            }
            Long offset = this.subscriptions.consumed(partition);
            if (offset == null) {
                this.updateFetchPositions(Collections.singleton(partition));
                long l = this.subscriptions.consumed(partition);
                return l;
            }
            long l = offset;
            return l;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long committed(TopicPartition partition) {
        this.acquire();
        try {
            Set<TopicPartition> partitionsToFetch;
            Long committed;
            if (this.subscriptions.assignedPartitions().contains(partition)) {
                committed = this.subscriptions.committed(partition);
                if (committed != null) {
                    long l = committed;
                    return l;
                }
                partitionsToFetch = this.subscriptions.assignedPartitions();
            } else {
                partitionsToFetch = Collections.singleton(partition);
            }
            this.refreshCommittedOffsets(partitionsToFetch);
            committed = this.subscriptions.committed(partition);
            if (committed == null) {
                throw new NoOffsetForPartitionException("No offset has been committed for partition " + partition);
            }
            long l = committed;
            return l;
        }
        finally {
            this.release();
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        this.acquire();
        try {
            Cluster cluster = this.metadata.fetch();
            List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
            if (parts == null) {
                this.metadata.add(topic);
                this.awaitMetadataUpdate();
                parts = this.metadata.fetch().partitionsForTopic(topic);
            }
            List<PartitionInfo> list = parts;
            return list;
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (this.closed) {
            return;
        }
        this.acquire();
        try {
            this.close(false);
        }
        finally {
            this.release();
        }
    }

    @Override
    public void wakeup() {
        this.wakeup.set(true);
        this.client.wakeup();
    }

    private void close(boolean swallowException) {
        log.trace("Closing the Kafka consumer.");
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        this.closed = true;
        ClientUtils.closeQuietly(this.metrics, "consumer metrics", firstException);
        ClientUtils.closeQuietly(this.client, "consumer network client", firstException);
        ClientUtils.closeQuietly(this.keyDeserializer, "consumer key deserializer", firstException);
        ClientUtils.closeQuietly(this.valueDeserializer, "consumer value deserializer", firstException);
        log.debug("The Kafka consumer has closed.");
        if (firstException.get() != null && !swallowException) {
            throw new KafkaException("Failed to close kafka consumer", firstException.get());
        }
    }

    private boolean shouldAutoCommit(long now) {
        return this.autoCommit && this.lastCommitAttemptMs <= now - this.autoCommitIntervalMs;
    }

    private long timeToNextCommit(long now) {
        if (!this.autoCommit) {
            return Long.MAX_VALUE;
        }
        long timeSinceLastCommit = now - this.lastCommitAttemptMs;
        if (timeSinceLastCommit > this.autoCommitIntervalMs) {
            return 0L;
        }
        return this.autoCommitIntervalMs - timeSinceLastCommit;
    }

    private void awaitMetadataUpdate() {
        int version = this.metadata.requestUpdate();
        do {
            long now = this.time.milliseconds();
            this.pollClient(this.retryBackoffMs, now);
        } while (this.metadata.version() == version);
    }

    private void reassignPartitions(long now) {
        log.debug("Revoking previously assigned partitions {}", this.subscriptions.assignedPartitions());
        try {
            this.rebalanceCallback.onPartitionsRevoked(this, this.subscriptions.assignedPartitions());
        }
        catch (Exception e) {
            log.error("User provided callback " + this.rebalanceCallback.getClass().getName() + " failed on partition revocation: ", (Throwable)e);
        }
        this.assignPartitions();
        log.debug("Setting newly assigned partitions {}", this.subscriptions.assignedPartitions());
        try {
            this.rebalanceCallback.onPartitionsAssigned(this, this.subscriptions.assignedPartitions());
        }
        catch (Exception e) {
            log.error("User provided callback " + this.rebalanceCallback.getClass().getName() + " failed on partition assignment: ", (Throwable)e);
        }
    }

    private void updateFetchPositions(Set<TopicPartition> partitions) {
        this.refreshCommittedOffsets(partitions);
        for (TopicPartition tp : partitions) {
            if (this.subscriptions.fetched(tp) != null) continue;
            if (this.subscriptions.isOffsetResetNeeded(tp)) {
                this.resetOffset(tp);
                continue;
            }
            if (this.subscriptions.committed(tp) == null) {
                this.subscriptions.needOffsetReset(tp);
                this.resetOffset(tp);
                continue;
            }
            log.debug("Resetting offset for partition {} to the committed offset {}", (Object)tp, (Object)this.subscriptions.committed(tp));
            this.subscriptions.seek(tp, this.subscriptions.committed(tp));
        }
    }

    private void resetOffset(TopicPartition partition) {
        long timestamp;
        OffsetResetStrategy strategy = this.subscriptions.resetStrategy(partition);
        if (strategy == OffsetResetStrategy.EARLIEST) {
            timestamp = -2L;
        } else if (strategy == OffsetResetStrategy.LATEST) {
            timestamp = -1L;
        } else {
            throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined");
        }
        log.debug("Resetting offset for partition {} to {} offset.", (Object)partition, (Object)strategy.name().toLowerCase());
        long offset = this.listOffset(partition, timestamp);
        this.subscriptions.seek(partition, offset);
    }

    private long listOffset(TopicPartition partition, long timestamp) {
        while (true) {
            RequestFuture<Long> future;
            if (!(future = this.fetcher.listOffset(partition, timestamp)).isDone()) {
                this.pollFuture(future, this.requestTimeoutMs);
            }
            if (!future.isDone()) continue;
            if (future.succeeded()) {
                return future.value();
            }
            this.handleRequestFailure(future);
        }
    }

    private void refreshCommittedOffsets(Set<TopicPartition> partitions) {
        if (this.subscriptions.refreshCommitsNeeded()) {
            Map<TopicPartition, Long> offsets = this.fetchCommittedOffsets(partitions);
            for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
                TopicPartition tp = entry.getKey();
                this.subscriptions.committed(tp, entry.getValue());
            }
        }
    }

    private void assignPartitions() {
        this.awaitCoordinatorInFlightRequests();
        while (this.subscriptions.partitionAssignmentNeeded()) {
            RequestFuture<Void> future = this.coordinator.assignPartitions(this.time.milliseconds());
            if (!future.isDone()) {
                this.pollFuture(future);
            }
            if (!future.failed()) continue;
            this.handleRequestFailure(future);
        }
    }

    private void ensureCoordinatorKnown() {
        while (this.coordinator.coordinatorUnknown()) {
            RequestFuture<Void> future = this.coordinator.discoverConsumerCoordinator();
            if (!future.isDone()) {
                this.pollFuture(future, this.requestTimeoutMs);
            }
            if (!future.failed()) continue;
            this.handleRequestFailure(future);
        }
    }

    public void awaitCoordinatorInFlightRequests() {
        while (this.coordinator.hasInFlightRequests()) {
            long now = this.time.milliseconds();
            this.pollClient(-1L, now);
        }
    }

    private Map<TopicPartition, Long> fetchCommittedOffsets(Set<TopicPartition> partitions) {
        while (true) {
            long now;
            RequestFuture<Map<TopicPartition, Long>> future;
            if (!(future = this.coordinator.fetchOffsets(partitions, now = this.time.milliseconds())).isDone()) {
                this.pollFuture(future, this.requestTimeoutMs);
            }
            if (!future.isDone()) continue;
            if (future.succeeded()) {
                return future.value();
            }
            this.handleRequestFailure(future);
        }
    }

    private void commitOffsets(Map<TopicPartition, Long> offsets, CommitType commitType) {
        if (commitType == CommitType.ASYNC) {
            this.commitOffsetsAsync(offsets);
        } else {
            this.commitOffsetsSync(offsets);
        }
    }

    private void commitOffsetsAsync(Map<TopicPartition, Long> offsets) {
        long now;
        RequestFuture<Void> future;
        while ((future = this.coordinator.commitOffsets(offsets, now = this.time.milliseconds())).isDone() && !future.succeeded()) {
            this.handleRequestFailure(future);
        }
        return;
    }

    private void commitOffsetsSync(Map<TopicPartition, Long> offsets) {
        while (true) {
            long now;
            RequestFuture<Void> future;
            if (!(future = this.coordinator.commitOffsets(offsets, now = this.time.milliseconds())).isDone()) {
                this.pollFuture(future, this.requestTimeoutMs);
            }
            if (!future.isDone()) continue;
            if (future.succeeded()) {
                return;
            }
            this.handleRequestFailure(future);
        }
    }

    private void handleRequestFailure(RequestFuture<?> future) {
        if (future.hasException()) {
            throw future.exception();
        }
        switch (future.retryAction()) {
            case BACKOFF: {
                Utils.sleep(this.retryBackoffMs);
                break;
            }
            case POLL: {
                this.pollClient(this.retryBackoffMs, this.time.milliseconds());
                break;
            }
            case FIND_COORDINATOR: {
                this.ensureCoordinatorKnown();
                break;
            }
            case REFRESH_METADATA: {
                this.awaitMetadataUpdate();
                break;
            }
        }
    }

    private void pollFuture(RequestFuture<?> future, long timeout) {
        long start;
        for (long remaining = timeout; !future.isDone() && remaining >= 0L; remaining -= this.time.milliseconds() - start) {
            start = this.time.milliseconds();
            this.pollClient(remaining, start);
            if (!future.isDone()) continue;
            return;
        }
    }

    private void pollFuture(RequestFuture<?> future) {
        while (!future.isDone()) {
            long now = this.time.milliseconds();
            this.pollClient(-1L, now);
        }
    }

    private void pollClient(long timeout, long now) {
        this.client.poll(timeout, now);
        if (this.wakeup.get()) {
            this.wakeup.set(false);
            throw new ConsumerWakeupException();
        }
    }

    private void ensureNotClosed() {
        if (this.closed) {
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        this.ensureNotClosed();
        Long threadId = Thread.currentThread().getId();
        if (!threadId.equals(this.currentThread.get()) && !this.currentThread.compareAndSet(null, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        }
        ++this.refcount;
    }

    private void release() {
        if (--this.refcount == 0) {
            this.currentThread.set(null);
        }
    }
}

