/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.Time;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;

public class SimpleConsumerRecordsCache {
    private final KafkaRestConfig config;
    private final int maxCachesNum;
    private final int maxPollTime;
    private final Time time;
    private Queue<TopicPartition> oldestCache;
    private ConcurrentMap<TopicPartition, CachePerTopicPartition> highLevelCache;

    public SimpleConsumerRecordsCache(KafkaRestConfig config) {
        this.config = config;
        this.maxCachesNum = config.getInt("simpleconsumer.max.caches.num");
        this.maxPollTime = config.getInt("simpleconsumer.max.poll.time");
        this.time = config.getTime();
        this.highLevelCache = new ConcurrentHashMap<TopicPartition, CachePerTopicPartition>(this.maxCachesNum);
        this.oldestCache = new ConcurrentLinkedQueue<TopicPartition>();
    }

    public List<ConsumerRecord<byte[], byte[]>> pollRecords(Consumer<byte[], byte[]> assignedConsumer, String topicName, int partitionId, long offset, long count) {
        if (this.maxCachesNum > 0) {
            TopicPartition topicPartition = new TopicPartition(topicName, partitionId);
            CachePerTopicPartition cache = (CachePerTopicPartition)this.highLevelCache.get(topicPartition);
            if (cache == null) {
                if (this.highLevelCache.size() >= this.maxCachesNum) {
                    TopicPartition eldest = this.oldestCache.poll();
                    this.highLevelCache.remove(eldest);
                }
                cache = new CachePerTopicPartition(this.config, topicPartition);
                this.highLevelCache.put(topicPartition, cache);
                this.oldestCache.add(topicPartition);
            }
            return cache.pollRecords(assignedConsumer, offset, count);
        }
        return CachePerTopicPartition.pollRecordsWithoutCaching(assignedConsumer, topicName, partitionId, offset, count, this.maxPollTime, this.time);
    }

    private static class CachePerTopicPartition {
        private final NavigableMap<Long, ConsumerRecord<byte[], byte[]>> cachedRecords;
        private final TopicPartition topicPartition;
        private final int cacheMaxSize;
        private final int maxPollTime;
        private final Time time;
        private int cacheSize;

        public CachePerTopicPartition(KafkaRestConfig config, TopicPartition topicPartition) {
            this.cacheMaxSize = config.getInt("simpleconsumer.cache.max.records");
            this.maxPollTime = config.getInt("simpleconsumer.max.poll.time");
            this.cacheSize = 0;
            this.time = config.getTime();
            this.cachedRecords = new TreeMap<Long, ConsumerRecord<byte[], byte[]>>();
            this.topicPartition = topicPartition;
        }

        private List<ConsumerRecord<byte[], byte[]>> getIfExists(long fromOffset, long count) {
            NavigableMap<Long, ConsumerRecord<byte[], byte[]>> cached = this.cachedRecords.subMap(fromOffset, true, fromOffset + count - 1L, true);
            if (cached.size() > 0) {
                ConsumerRecord record;
                ArrayList<ConsumerRecord<byte[], byte[]>> res = new ArrayList<ConsumerRecord<byte[], byte[]>>();
                Iterator iterator = cached.values().iterator();
                for (long currentOffset = fromOffset; iterator.hasNext() && (record = (ConsumerRecord)iterator.next()).offset() == currentOffset; ++currentOffset) {
                    res.add((ConsumerRecord<byte[], byte[]>)record);
                }
                return res;
            }
            return null;
        }

        private boolean cacheRecord(ConsumerRecord<byte[], byte[]> record) {
            int freeSpace = this.cacheMaxSize - this.cacheSize;
            if (freeSpace > 0) {
                this.cachedRecords.put(record.offset(), record);
                ++this.cacheSize;
            } else {
                Long lowestOffset = (Long)this.cachedRecords.firstKey();
                if (lowestOffset < record.offset()) {
                    this.cachedRecords.remove(lowestOffset);
                    this.cachedRecords.put(record.offset(), record);
                } else {
                    return false;
                }
            }
            return true;
        }

        private synchronized boolean doPoll(Consumer<byte[], byte[]> assignedConsumer, List<ConsumerRecord<byte[], byte[]>> resultRecords, long startOffset, long count, long pollTime) {
            long endTime = this.time.milliseconds() + pollTime;
            List<ConsumerRecord<byte[], byte[]>> appendedRecords = this.getIfExists(startOffset, count);
            long startPollOffset = startOffset;
            if (appendedRecords != null && !appendedRecords.isEmpty()) {
                resultRecords.addAll(appendedRecords);
                startPollOffset = appendedRecords.get(appendedRecords.size() - 1).offset() + 1L;
            }
            if ((long)resultRecords.size() < count && (pollTime = endTime - this.time.milliseconds()) > 0L) {
                assignedConsumer.seek(this.topicPartition, startPollOffset);
                ConsumerRecords records = assignedConsumer.poll(Math.max(0L, pollTime));
                if (records.isEmpty()) {
                    return true;
                }
                Iterator it = records.iterator();
                boolean enough = false;
                while (it.hasNext()) {
                    resultRecords.add((ConsumerRecord<byte[], byte[]>)it.next());
                    if ((long)resultRecords.size() != count) continue;
                    enough = true;
                    break;
                }
                if (enough) {
                    while (it.hasNext()) {
                        this.cacheRecord((ConsumerRecord<byte[], byte[]>)((ConsumerRecord)it.next()));
                    }
                } else {
                    pollTime = endTime - this.time.milliseconds();
                    if (pollTime > 0L) {
                        return this.doPoll(assignedConsumer, resultRecords, resultRecords.get(resultRecords.size() - 1).offset() + 1L, count, pollTime);
                    }
                    return true;
                }
            }
            return false;
        }

        public synchronized List<ConsumerRecord<byte[], byte[]>> pollRecords(Consumer<byte[], byte[]> assignedConsumer, long offset, long count) {
            if (count > 0L) {
                ArrayList<ConsumerRecord<byte[], byte[]>> records = new ArrayList<ConsumerRecord<byte[], byte[]>>();
                this.doPoll(assignedConsumer, records, offset, count, this.maxPollTime);
                return records;
            }
            return null;
        }

        public static List<ConsumerRecord<byte[], byte[]>> pollRecordsWithoutCaching(Consumer<byte[], byte[]> assignedConsumer, String topicName, int partitionId, long offset, long count, long maxPollTime, Time time) {
            assignedConsumer.seek(new TopicPartition(topicName, partitionId), offset);
            ArrayList<ConsumerRecord<byte[], byte[]>> result = new ArrayList<ConsumerRecord<byte[], byte[]>>();
            long endTime = time.milliseconds() + maxPollTime;
            Iterator it = null;
            boolean enough = false;
            block0: while (!enough && (maxPollTime = endTime - time.milliseconds()) > 0L) {
                if (it == null || !it.hasNext()) {
                    ConsumerRecords records = assignedConsumer.poll(Math.max(0L, maxPollTime));
                    if (records.isEmpty()) continue;
                    it = records.iterator();
                }
                while (it.hasNext()) {
                    ConsumerRecord record = (ConsumerRecord)it.next();
                    result.add((ConsumerRecord<byte[], byte[]>)record);
                    if ((long)result.size() != count) continue;
                    enough = true;
                    continue block0;
                }
            }
            return result;
        }
    }
}

