/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.AbstractSegments;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Segment>
implements SegmentedBytesStore {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
    private final String name;
    protected final AbstractSegments<S> segments;
    protected final SegmentedBytesStore.KeySchema baseKeySchema;
    protected final Optional<SegmentedBytesStore.KeySchema> indexKeySchema;
    private final long retentionPeriod;
    protected ProcessorContext context;
    private StateStoreContext stateStoreContext;
    private Sensor expiredRecordSensor;
    protected long observedStreamTime = -1L;
    protected boolean consistencyEnabled = false;
    protected Position position;
    protected OffsetCheckpoint positionCheckpoint;
    private volatile boolean open;

    AbstractDualSchemaRocksDBSegmentedBytesStore(String name, SegmentedBytesStore.KeySchema baseKeySchema, Optional<SegmentedBytesStore.KeySchema> indexKeySchema, AbstractSegments<S> segments, long retentionPeriod) {
        this.name = name;
        this.baseKeySchema = baseKeySchema;
        this.indexKeySchema = indexKeySchema;
        this.segments = segments;
        this.retentionPeriod = retentionPeriod;
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> all() {
        long actualFrom = this.getActualFrom(0L, this.baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
        List<S> searchSpace = this.segments.allSegments(true);
        Bytes from = this.baseKeySchema.lowerRange(null, actualFrom);
        Bytes to = this.baseKeySchema.upperRange(null, Long.MAX_VALUE);
        return new SegmentIterator<S>(searchSpace.iterator(), this.baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true), from, to, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardAll() {
        long actualFrom = this.getActualFrom(0L, this.baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
        List<S> searchSpace = this.segments.allSegments(false);
        Bytes from = this.baseKeySchema.lowerRange(null, actualFrom);
        Bytes to = this.baseKeySchema.upperRange(null, Long.MAX_VALUE);
        return new SegmentIterator<S>(searchSpace.iterator(), this.baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false), from, to, false);
    }

    @Override
    public void remove(Bytes rawBaseKey) {
        long timestamp = this.baseKeySchema.segmentTimestamp(rawBaseKey);
        this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        S segment = this.segments.getSegmentForTimestamp(timestamp);
        if (segment == null) {
            return;
        }
        segment.delete((Bytes)rawBaseKey);
        if (this.hasIndex()) {
            KeyValue<Bytes, byte[]> kv = this.getIndexKeyValue(rawBaseKey, null);
            segment.delete((Bytes)((Bytes)kv.key));
        }
    }

    protected abstract KeyValue<Bytes, byte[]> getIndexKeyValue(Bytes var1, byte[] var2);

    protected long getActualFrom(long from, boolean isTimeFirstWindowSchema) {
        return isTimeFirstWindowSchema ? Math.max(from, this.observedStreamTime - this.retentionPeriod) : Math.max(from, this.observedStreamTime - this.retentionPeriod + 1L);
    }

    void putIndex(Bytes indexKey, byte[] value) {
        if (!this.hasIndex()) {
            throw new IllegalStateException("Index store doesn't exist");
        }
        long timestamp = this.indexKeySchema.get().segmentTimestamp(indexKey);
        long segmentId = this.segments.segmentId(timestamp);
        S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
        if (segment != null) {
            segment.put((Bytes)indexKey, (byte[])value);
        }
    }

    byte[] getIndex(Bytes indexKey) {
        if (!this.hasIndex()) {
            throw new IllegalStateException("Index store doesn't exist");
        }
        long timestamp = this.indexKeySchema.get().segmentTimestamp(indexKey);
        long segmentId = this.segments.segmentId(timestamp);
        S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
        if (segment != null) {
            return (byte[])segment.get((Bytes)indexKey);
        }
        return null;
    }

    void removeIndex(Bytes indexKey) {
        if (!this.hasIndex()) {
            throw new IllegalStateException("Index store doesn't exist");
        }
        long timestamp = this.indexKeySchema.get().segmentTimestamp(indexKey);
        long segmentId = this.segments.segmentId(timestamp);
        S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
        if (segment != null) {
            segment.delete((Bytes)indexKey);
        }
    }

    @Override
    public void put(Bytes rawBaseKey, byte[] value) {
        long timestamp = this.baseKeySchema.segmentTimestamp(rawBaseKey);
        this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        long segmentId = this.segments.segmentId(timestamp);
        S segment = this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
        if (segment == null) {
            this.expiredRecordSensor.record(1.0, this.context.currentSystemTimeMs());
            LOG.warn("Skipping record for expired segment.");
        } else {
            StoreQueryUtils.updatePosition(this.position, this.stateStoreContext);
            if (this.hasIndex()) {
                KeyValue<Bytes, byte[]> indexKeyValue = this.getIndexKeyValue(rawBaseKey, value);
                segment.put((Bytes)((Bytes)indexKeyValue.key), (byte[])((byte[])indexKeyValue.value));
            }
            segment.put((Bytes)rawBaseKey, (byte[])value);
        }
    }

    @Override
    public byte[] get(Bytes rawKey) {
        S segment;
        long timestampFromRawKey = this.baseKeySchema.segmentTimestamp(rawKey);
        if (this.baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            if (timestampFromRawKey < this.observedStreamTime - this.retentionPeriod) {
                LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})", new Object[]{rawKey.toString(), timestampFromRawKey, this.observedStreamTime - this.retentionPeriod});
                return null;
            }
        } else if (timestampFromRawKey < this.observedStreamTime - this.retentionPeriod + 1L) {
            LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})", new Object[]{rawKey.toString(), timestampFromRawKey, this.observedStreamTime - this.retentionPeriod + 1L});
            return null;
        }
        if ((segment = this.segments.getSegmentForTimestamp(timestampFromRawKey)) == null) {
            return null;
        }
        return (byte[])segment.get((Bytes)rawKey);
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    @Deprecated
    public void init(ProcessorContext context, StateStore root) {
        this.context = context;
        StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
        String threadId = Thread.currentThread().getName();
        String taskName = context.taskId().toString();
        this.expiredRecordSensor = TaskMetrics.droppedRecordsSensor(threadId, taskName, metrics);
        this.segments.openExisting(context, this.observedStreamTime);
        File positionCheckpointFile = new File(context.stateDir(), this.name() + ".position");
        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
        this.position = StoreQueryUtils.readPositionFromCheckpoint(this.positionCheckpoint);
        this.stateStoreContext.register(root, this::restoreAllInternal, () -> StoreQueryUtils.checkpointPosition(this.positionCheckpoint, this.position));
        this.open = true;
        this.consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(context.appConfigs(), "__iq.consistency.offset.vector.enabled__", false);
    }

    @Override
    public void init(StateStoreContext context, StateStore root) {
        this.stateStoreContext = context;
        this.init(StoreToProcessorContextAdapter.adapt(context), root);
    }

    @Override
    public void flush() {
        this.segments.flush();
    }

    @Override
    public void close() {
        this.open = false;
        this.segments.close();
    }

    @Override
    public boolean persistent() {
        return true;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    List<S> getSegments() {
        return this.segments.allSegments(false);
    }

    void restoreAllInternal(Collection<ConsumerRecord<byte[], byte[]>> records) {
        try {
            Map<S, WriteBatch> writeBatchMap = this.getWriteBatches(records);
            for (Map.Entry<S, WriteBatch> entry : writeBatchMap.entrySet()) {
                Segment segment = (Segment)entry.getKey();
                WriteBatch batch = entry.getValue();
                segment.write(batch);
                batch.close();
            }
        }
        catch (RocksDBException e) {
            throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
        }
    }

    abstract Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], byte[]>> var1);

    @Override
    public Position getPosition() {
        return this.position;
    }

    public boolean hasIndex() {
        return this.indexKeySchema.isPresent();
    }
}

