/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.ConsumerReadTask;
import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConsumerState<KafkaK, KafkaV, ClientK, ClientV>
implements Comparable<ConsumerState>,
AutoCloseable,
ConsumerRebalanceListener {
    private static final Logger log = LoggerFactory.getLogger(ConsumerState.class);
    private Consumer<KafkaK, KafkaV> consumer;
    private AtomicBoolean isSubscribed;
    private Map<TopicPartition, OffsetAndMetadata> consumedOffsets;
    private Map<TopicPartition, OffsetAndMetadata> committedOffsets;
    private Queue<ConsumerRecord<KafkaK, KafkaV>> recordsQueue;
    protected KafkaRestConfig config;
    private ConsumerInstanceId instanceId;
    private long expiration;
    private ReadWriteLock lock;
    private long nextHeartbeatTime;
    private final long heartbeatDelay;
    private ConsumerHeartbeatThread heartbeatThread;
    private ConsumerReadTask failedTask;

    public ConsumerState(KafkaRestConfig config, ConsumerInstanceId instanceId, Properties consumerProperties, ConsumerManager.ConsumerFactory consumerFactory) {
        this.config = config;
        this.instanceId = instanceId;
        this.consumer = consumerFactory.createConsumer(consumerProperties, this.getKeyDeserializer(), this.getValueDeserializer());
        this.expiration = config.getTime().milliseconds() + (long)config.getInt("consumer.instance.timeout.ms");
        this.lock = new ReentrantReadWriteLock(true);
        this.recordsQueue = new LinkedList<ConsumerRecord<KafkaK, KafkaV>>();
        this.isSubscribed = new AtomicBoolean(false);
        this.consumedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.committedOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        int defaultSessionTimeoutMs = 30000;
        String consumerSessionTimeout = consumerProperties.getProperty("session.timeout.ms");
        this.heartbeatDelay = consumerSessionTimeout == null ? 7500L : (long)Integer.valueOf(consumerSessionTimeout).intValue();
        this.nextHeartbeatTime = 0L;
        this.heartbeatThread = new ConsumerHeartbeatThread();
        this.heartbeatThread.start();
    }

    public ConsumerInstanceId getId() {
        return this.instanceId;
    }

    public Consumer<KafkaK, KafkaV> getConsumer() {
        return this.consumer;
    }

    protected abstract Deserializer<KafkaK> getKeyDeserializer();

    protected abstract Deserializer<KafkaV> getValueDeserializer();

    public abstract ConsumerRecordAndSize<ClientK, ClientV> convertConsumerRecord(ConsumerRecord<KafkaK, KafkaV> var1);

    public void startRead() {
        this.lock.writeLock().lock();
    }

    public void finishRead() {
        this.nextHeartbeatTime = this.config.getTime().milliseconds() + this.heartbeatDelay;
        this.lock.writeLock().unlock();
    }

    public ConsumerReadTask clearFailedTask() {
        ConsumerReadTask t = this.failedTask;
        this.failedTask = null;
        return t;
    }

    public void setFailedTask(ConsumerReadTask failedTask) {
        this.failedTask = failedTask;
    }

    public Map<TopicPartition, OffsetAndMetadata> getConsumedOffsets() {
        return this.consumedOffsets;
    }

    public List<TopicPartitionOffset> commitOffsets() {
        this.lock.writeLock().lock();
        try {
            List<TopicPartitionOffset> result;
            List<TopicPartitionOffset> list = result = this.getOffsets(true);
            return list;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendHeartbeat() {
        if (this.consumer != null && this.isSubscribed.get() && this.config.getTime().milliseconds() >= this.nextHeartbeatTime && this.lock.readLock().tryLock()) {
            try {
                log.info("Consumer {} sends heartbeat.", (Object)this.instanceId.getInstance());
                ConsumerRecords records = this.consumer.poll(100L);
                this.nextHeartbeatTime = this.config.getTime().milliseconds() + this.heartbeatDelay;
                for (ConsumerRecord record : records) {
                    this.recordsQueue.add(record);
                }
            }
            finally {
                this.lock.readLock().unlock();
            }
        }
    }

    public long getNextHeartbeatTime() {
        return this.nextHeartbeatTime;
    }

    public Queue<ConsumerRecord<KafkaK, KafkaV>> queue() {
        return this.recordsQueue;
    }

    @Override
    public void close() {
        this.consumer.wakeup();
        this.lock.writeLock().lock();
        try {
            this.heartbeatThread.shutdown();
            this.consumer.close();
            this.consumer = null;
        }
        finally {
            this.lock.writeLock().lock();
        }
    }

    public boolean expired(long nowMs) {
        return this.expiration <= nowMs;
    }

    public void updateExpiration() {
        this.expiration = this.config.getTime().milliseconds() + (long)this.config.getInt("consumer.instance.timeout.ms");
    }

    public long untilExpiration(long nowMs) {
        return this.expiration - nowMs;
    }

    public KafkaRestConfig getConfig() {
        return this.config;
    }

    public void setConfig(KafkaRestConfig config) {
        this.config = config;
    }

    @Override
    public int compareTo(ConsumerState o) {
        if (this.expiration < o.expiration) {
            return -1;
        }
        if (this.expiration == o.expiration) {
            return 0;
        }
        return 1;
    }

    public boolean isSubscribed() {
        return this.isSubscribed.get();
    }

    public Set<String> getSubscribedTopics() {
        if (!this.isSubscribed.get()) {
            return Collections.emptySet();
        }
        this.lock.writeLock().lock();
        try {
            Set set = this.consumer.subscription();
            return set;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public void tryToSubscribeByTopicList(List<String> topics) {
        block4: {
            this.lock.writeLock().lock();
            try {
                if (!this.isSubscribed.get()) {
                    this.consumer.subscribe(topics, (ConsumerRebalanceListener)this);
                    this.isSubscribed.set(true);
                    break block4;
                }
                throw Errors.consumerAlreadySubscribedException();
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
    }

    public void tryToSubscribeByTopicRegex(String regex) {
        block4: {
            this.lock.writeLock().lock();
            try {
                if (!this.isSubscribed.get()) {
                    this.consumer.subscribe(Pattern.compile(regex), (ConsumerRebalanceListener)this);
                    this.isSubscribed.set(true);
                    break block4;
                }
                throw Errors.consumerAlreadySubscribedException();
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
    }

    public void unsubscribe() {
        this.lock.writeLock().lock();
        try {
            this.clearFailedTask();
            this.consumer.unsubscribe();
            this.isSubscribed.set(false);
            this.committedOffsets = null;
            this.consumedOffsets = null;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<TopicPartitionOffset> getOffsets(boolean doCommit) {
        Vector<TopicPartitionOffset> result = new Vector<TopicPartitionOffset>();
        this.lock.writeLock().lock();
        try {
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.consumedOffsets.entrySet()) {
                Integer partition = entry.getKey().partition();
                Long offset = entry.getValue().offset();
                Long committedOffset = null;
                if (doCommit) {
                    OffsetAndMetadata newMetadata = new OffsetAndMetadata(entry.getValue().offset() + 1L, entry.getValue().metadata());
                    this.committedOffsets.put(entry.getKey(), newMetadata);
                    committedOffset = offset;
                } else {
                    OffsetAndMetadata committed = this.committedOffsets.get(entry.getKey());
                    committedOffset = committed == null ? null : Long.valueOf(committed.offset());
                }
                result.add(new TopicPartitionOffset(entry.getKey().topic(), partition, offset, committedOffset == null ? -1L : committedOffset));
            }
            if (doCommit) {
                this.consumer.commitSync(this.committedOffsets);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return result;
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            log.info("Consumer: {} Revoked: {}-{}", new Object[]{this.instanceId.toString(), tp.topic(), tp.partition()});
            this.consumedOffsets.remove(tp);
            this.committedOffsets.remove(tp);
        }
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            log.info("Consumer: {} Assigned: {}-{}", new Object[]{this.instanceId.toString(), tp.topic(), tp.partition()});
        }
    }

    private class ConsumerHeartbeatThread
    extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        public ConsumerHeartbeatThread() {
            super("Consumer " + ConsumerState.this.instanceId.getInstance() + " Heartbeat Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            while (this.isRunning.get()) {
                try {
                    ConsumerState.this.nextHeartbeatTime = Math.min(ConsumerState.this.nextHeartbeatTime, ConsumerState.this.getNextHeartbeatTime());
                    ConsumerState.this.sendHeartbeat();
                    long wait = ConsumerState.this.nextHeartbeatTime - ConsumerState.this.config.getTime().milliseconds();
                    if (wait <= 0L) continue;
                    ConsumerHeartbeatThread consumerHeartbeatThread = this;
                    synchronized (consumerHeartbeatThread) {
                        this.wait(wait);
                    }
                }
                catch (Exception e) {
                    log.warn("Heartbeat exception " + ConsumerState.this.instanceId.getInstance(), (Throwable)e);
                }
            }
            this.shutdownLatch.countDown();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void shutdown() {
            try {
                this.isRunning.set(false);
                this.interrupt();
                ConsumerHeartbeatThread consumerHeartbeatThread = this;
                synchronized (consumerHeartbeatThread) {
                    this.notify();
                }
                this.shutdownLatch.await();
            }
            catch (InterruptedException e) {
                throw new Error("Interrupted when shutting down consumer heartbeat thread.");
            }
        }
    }
}

