/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemorySessionStore
implements SessionStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class);
    private final String name;
    private final String metricScope;
    private Sensor expiredRecordSensor;
    private InternalProcessorContext context;
    private long observedStreamTime = -1L;
    private final long retentionPeriod;
    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> endTimeMap = new ConcurrentSkipListMap<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>();
    private final Set<InMemorySessionStoreIterator> openIterators = ConcurrentHashMap.newKeySet();
    private volatile boolean open = false;

    InMemorySessionStore(String name, long retentionPeriod, String metricScope) {
        this.name = name;
        this.retentionPeriod = retentionPeriod;
        this.metricScope = metricScope;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.context = (InternalProcessorContext)context;
        StreamsMetricsImpl metrics = this.context.metrics();
        String threadId = Thread.currentThread().getName();
        String taskName = context.taskId().toString();
        this.expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(threadId, taskName, this.metricScope, this.name, metrics);
        if (root != null) {
            context.register(root, (key, value) -> this.put(SessionKeySchema.from(Bytes.wrap((byte[])key)), value));
        }
        this.open = true;
    }

    @Override
    public void put(Windowed<Bytes> sessionKey, byte[] aggregate) {
        this.removeExpiredSegments();
        long windowEndTimestamp = sessionKey.window().end();
        this.observedStreamTime = Math.max(this.observedStreamTime, windowEndTimestamp);
        if (windowEndTimestamp <= this.observedStreamTime - this.retentionPeriod) {
            this.expiredRecordSensor.record(1.0, this.context.currentSystemTimeMs());
            LOG.warn("Skipping record for expired segment.");
        } else if (aggregate != null) {
            this.endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap());
            ConcurrentNavigableMap keyMap = (ConcurrentNavigableMap)this.endTimeMap.get(windowEndTimestamp);
            keyMap.computeIfAbsent(sessionKey.key(), t -> new ConcurrentSkipListMap());
            ((ConcurrentNavigableMap)keyMap.get(sessionKey.key())).put(sessionKey.window().start(), aggregate);
        } else {
            this.remove(sessionKey);
        }
    }

    @Override
    public void remove(Windowed<Bytes> sessionKey) {
        ConcurrentNavigableMap keyMap = (ConcurrentNavigableMap)this.endTimeMap.get(sessionKey.window().end());
        if (keyMap == null) {
            return;
        }
        ConcurrentNavigableMap startTimeMap = (ConcurrentNavigableMap)keyMap.get(sessionKey.key());
        if (startTimeMap == null) {
            return;
        }
        startTimeMap.remove(sessionKey.window().start());
        if (startTimeMap.isEmpty()) {
            keyMap.remove(sessionKey.key());
            if (keyMap.isEmpty()) {
                this.endTimeMap.remove(sessionKey.window().end());
            }
        }
    }

    @Override
    public byte[] fetchSession(Bytes key, long startTime, long endTime) {
        ConcurrentNavigableMap startTimeMap;
        ConcurrentNavigableMap keyMap;
        this.removeExpiredSegments();
        Objects.requireNonNull(key, "key cannot be null");
        if (endTime > this.observedStreamTime - this.retentionPeriod && (keyMap = (ConcurrentNavigableMap)this.endTimeMap.get(endTime)) != null && (startTimeMap = (ConcurrentNavigableMap)keyMap.get(key)) != null) {
            return (byte[])startTimeMap.get(startTime);
        }
        return null;
    }

    @Override
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes key, long earliestSessionEndTime, long latestSessionStartTime) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        return this.registerNewIterator(key, key, latestSessionStartTime, this.endTimeMap.tailMap((Object)earliestSessionEndTime, true).entrySet().iterator());
    }

    @Override
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes keyFrom, Bytes keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
        Objects.requireNonNull(keyFrom, "from key cannot be null");
        Objects.requireNonNull(keyTo, "to key cannot be null");
        this.removeExpiredSegments();
        if (keyFrom.compareTo(keyTo) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        return this.registerNewIterator(keyFrom, keyTo, latestSessionStartTime, this.endTimeMap.tailMap((Object)earliestSessionEndTime, true).entrySet().iterator());
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes key) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        return this.registerNewIterator(key, key, Long.MAX_VALUE, this.endTimeMap.entrySet().iterator());
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to) {
        Objects.requireNonNull(from, "from key cannot be null");
        Objects.requireNonNull(to, "to key cannot be null");
        this.removeExpiredSegments();
        return this.registerNewIterator(from, to, Long.MAX_VALUE, this.endTimeMap.entrySet().iterator());
    }

    @Override
    public boolean persistent() {
        return false;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() {
        if (this.openIterators.size() != 0) {
            LOG.warn("Closing {} open iterators for store {}", (Object)this.openIterators.size(), (Object)this.name);
            for (InMemorySessionStoreIterator it : this.openIterators) {
                it.close();
            }
        }
        this.endTimeMap.clear();
        this.openIterators.clear();
        this.open = false;
    }

    private void removeExpiredSegments() {
        long minLiveTime = Math.max(0L, this.observedStreamTime - this.retentionPeriod + 1L);
        for (InMemorySessionStoreIterator it : this.openIterators) {
            minLiveTime = Math.min(minLiveTime, it.minTime());
        }
        this.endTimeMap.headMap((Object)minLiveTime, false).clear();
    }

    private InMemorySessionStoreIterator registerNewIterator(Bytes keyFrom, Bytes keyTo, long latestSessionStartTime, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator) {
        InMemorySessionStoreIterator iterator = new InMemorySessionStoreIterator(keyFrom, keyTo, latestSessionStartTime, endTimeIterator, it -> this.openIterators.remove(it));
        this.openIterators.add(iterator);
        return iterator;
    }

    private static class InMemorySessionStoreIterator
    implements KeyValueIterator<Windowed<Bytes>, byte[]> {
        private final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator;
        private Iterator<Map.Entry<Bytes, ConcurrentNavigableMap<Long, byte[]>>> keyIterator;
        private Iterator<Map.Entry<Long, byte[]>> recordIterator;
        private KeyValue<Windowed<Bytes>, byte[]> next;
        private Bytes currentKey;
        private long currentEndTime;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final long latestSessionStartTime;
        private final ClosingCallback callback;

        InMemorySessionStoreIterator(Bytes keyFrom, Bytes keyTo, long latestSessionStartTime, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>>> endTimeIterator, ClosingCallback callback) {
            this.keyFrom = keyFrom;
            this.keyTo = keyTo;
            this.latestSessionStartTime = latestSessionStartTime;
            this.endTimeIterator = endTimeIterator;
            this.callback = callback;
            this.setAllIterators();
        }

        @Override
        public boolean hasNext() {
            if (this.next != null) {
                return true;
            }
            if (this.recordIterator == null) {
                return false;
            }
            this.next = this.getNext();
            return this.next != null;
        }

        @Override
        public Windowed<Bytes> peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return (Windowed)this.next.key;
        }

        @Override
        public KeyValue<Windowed<Bytes>, byte[]> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Windowed<Bytes>, byte[]> ret = this.next;
            this.next = null;
            return ret;
        }

        @Override
        public void close() {
            this.next = null;
            this.recordIterator = null;
            this.callback.deregisterIterator(this);
        }

        Long minTime() {
            return this.currentEndTime;
        }

        private KeyValue<Windowed<Bytes>, byte[]> getNext() {
            if (!this.recordIterator.hasNext()) {
                this.getNextIterators();
            }
            if (this.recordIterator == null) {
                return null;
            }
            Map.Entry<Long, byte[]> nextRecord = this.recordIterator.next();
            SessionWindow sessionWindow = new SessionWindow(nextRecord.getKey(), this.currentEndTime);
            Windowed<Bytes> windowedKey = new Windowed<Bytes>(this.currentKey, sessionWindow);
            return new KeyValue<Windowed<Bytes>, byte[]>(windowedKey, nextRecord.getValue());
        }

        private void setAllIterators() {
            while (this.endTimeIterator.hasNext()) {
                Map.Entry<Long, ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>>> nextEndTimeEntry = this.endTimeIterator.next();
                this.currentEndTime = nextEndTimeEntry.getKey();
                this.keyIterator = nextEndTimeEntry.getValue().subMap((Object)this.keyFrom, true, (Object)this.keyTo, true).entrySet().iterator();
                if (!this.setInnerIterators()) continue;
                return;
            }
            this.recordIterator = null;
        }

        private boolean setInnerIterators() {
            while (this.keyIterator.hasNext()) {
                Map.Entry<Bytes, ConcurrentNavigableMap<Long, byte[]>> nextKeyEntry = this.keyIterator.next();
                this.currentKey = nextKeyEntry.getKey();
                this.recordIterator = this.latestSessionStartTime == Long.MAX_VALUE ? nextKeyEntry.getValue().entrySet().iterator() : nextKeyEntry.getValue().headMap((Object)this.latestSessionStartTime, true).entrySet().iterator();
                if (!this.recordIterator.hasNext()) continue;
                return true;
            }
            return false;
        }

        private void getNextIterators() {
            if (this.setInnerIterators()) {
                return;
            }
            this.setAllIterators();
        }
    }

    static interface ClosingCallback {
        public void deregisterIterator(InMemorySessionStoreIterator var1);
    }
}

