/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.AbstractRocksDBTimeOrderedSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.KeyValueSegment;
import org.apache.kafka.streams.state.internals.PrefixedSessionKeySchemas;
import org.apache.kafka.streams.state.internals.SegmentIterator;
import org.apache.kafka.streams.state.internals.SessionKeySchema;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;

public class RocksDBTimeOrderedSessionSegmentedBytesStore
extends AbstractRocksDBTimeOrderedSegmentedBytesStore {
    RocksDBTimeOrderedSessionSegmentedBytesStore(String name, String metricsScope, long retention, long segmentInterval, boolean withIndex) {
        super(name, metricsScope, retention, segmentInterval, new PrefixedSessionKeySchemas.TimeFirstSessionKeySchema(), Optional.ofNullable(withIndex ? new PrefixedSessionKeySchemas.KeyFirstSessionKeySchema() : null));
    }

    public byte[] fetchSession(Bytes key, long sessionStartTime, long sessionEndTime) {
        return this.get(PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.toBinary(key, sessionStartTime, sessionEndTime));
    }

    public KeyValueIterator<Bytes, byte[]> fetchSessions(long earliestSessionEndTime, long latestSessionEndTime) {
        List searchSpace = this.segments.segments(earliestSessionEndTime, latestSessionEndTime, true);
        Bytes binaryFrom = this.baseKeySchema.lowerRangeFixedSize(null, earliestSessionEndTime);
        Bytes binaryTo = this.baseKeySchema.lowerRangeFixedSize(null, latestSessionEndTime + 1L);
        return new SegmentIterator(searchSpace.iterator(), iterator -> {
            while (iterator.hasNext()) {
                Bytes bytes = (Bytes)iterator.peekNextKey();
                Windowed<Bytes> windowedKey = PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.from(bytes);
                long endTime = windowedKey.window().end();
                if (endTime <= latestSessionEndTime && endTime >= earliestSessionEndTime) {
                    return true;
                }
                iterator.next();
            }
            return false;
        }, binaryFrom, binaryTo, true);
    }

    public void remove(Windowed<Bytes> key) {
        this.remove(PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.toBinary(key));
    }

    public void put(Windowed<Bytes> sessionKey, byte[] aggregate) {
        this.put(PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.toBinary(sessionKey), aggregate);
    }

    @Override
    protected KeyValue<Bytes, byte[]> getIndexKeyValue(Bytes baseKey, byte[] baseValue) {
        Window window = PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.extractWindow(baseKey.get());
        byte[] key = PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.extractKeyBytes(baseKey.get());
        return KeyValue.pair(PrefixedSessionKeySchemas.KeyFirstSessionKeySchema.toBinary(Bytes.wrap((byte[])key), window.start(), window.end()), new byte[0]);
    }

    @Override
    Map<KeyValueSegment, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], byte[]>> records) {
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long timestamp = SessionKeySchema.extractEndTimestamp((byte[])record.key());
            this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        }
        HashMap<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<KeyValueSegment, WriteBatch>();
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long timestamp = SessionKeySchema.extractEndTimestamp((byte[])record.key());
            long segmentId = this.segments.segmentId(timestamp);
            KeyValueSegment segment = (KeyValueSegment)this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
            if (segment == null) continue;
            ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(record, this.consistencyEnabled, this.position);
            try {
                WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
                if (this.hasIndex()) {
                    byte[] indexKey = PrefixedSessionKeySchemas.KeyFirstSessionKeySchema.prefixNonPrefixSessionKey((byte[])record.key());
                    byte[] value = record.value() == null ? null : new byte[]{};
                    segment.addToBatch(new KeyValue<byte[], byte[]>(indexKey, value), batch);
                }
                byte[] baseKey = PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.extractWindowBytesFromNonPrefixSessionKey((byte[])record.key());
                segment.addToBatch(new KeyValue<byte[], byte[]>(baseKey, (byte[])record.value()), batch);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.name(), e);
            }
        }
        return writeBatchMap;
    }

    @Override
    protected AbstractRocksDBTimeOrderedSegmentedBytesStore.IndexToBaseStoreIterator getIndexToBaseStoreIterator(SegmentIterator<KeyValueSegment> segmentIterator) {
        return new SessionKeySchemaIndexToBaseStoreIterator(segmentIterator);
    }

    private class SessionKeySchemaIndexToBaseStoreIterator
    extends AbstractRocksDBTimeOrderedSegmentedBytesStore.IndexToBaseStoreIterator {
        SessionKeySchemaIndexToBaseStoreIterator(KeyValueIterator<Bytes, byte[]> indexIterator) {
            super(indexIterator);
        }

        @Override
        protected Bytes getBaseKey(Bytes indexKey) {
            Window window = PrefixedSessionKeySchemas.KeyFirstSessionKeySchema.extractWindow(indexKey.get());
            byte[] key = PrefixedSessionKeySchemas.KeyFirstSessionKeySchema.extractKeyBytes(indexKey.get());
            return PrefixedSessionKeySchemas.TimeFirstSessionKeySchema.toBinary(Bytes.wrap((byte[])key), window.start(), window.end());
        }
    }
}

