/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionKeySerde;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.StoreChangeLogger;
import org.apache.kafka.streams.state.internals.WindowStoreUtils;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

class ChangeLoggingSessionBytesStore
extends WrappedStateStore.AbstractStateStore
implements SessionStore<Bytes, byte[]> {
    private final SessionStore<Bytes, byte[]> bytesStore;
    private StoreChangeLogger<Bytes, byte[]> changeLogger;
    private StateSerdes<Bytes, byte[]> innerStateSerde;
    private String topic;

    ChangeLoggingSessionBytesStore(SessionStore<Bytes, byte[]> bytesStore) {
        super(bytesStore);
        this.bytesStore = bytesStore;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.bytesStore.init(context, root);
        this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.bytesStore.name(), context.applicationInternalStream());
        this.innerStateSerde = WindowStoreUtils.getInnerStateSerde(this.topic);
        this.changeLogger = new StoreChangeLogger<Bytes, byte[]>(this.name(), context, this.innerStateSerde);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes key, long earliestSessionEndTime, long latestSessionStartTime) {
        return this.bytesStore.findSessions(key, earliestSessionEndTime, latestSessionStartTime);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(Bytes keyFrom, Bytes keyTo, long earliestSessionEndTime, long latestSessionStartTime) {
        return this.bytesStore.findSessions(keyFrom, keyTo, earliestSessionEndTime, latestSessionStartTime);
    }

    @Override
    public void remove(Windowed<Bytes> sessionKey) {
        this.bytesStore.remove(sessionKey);
        this.changeLogger.logChange(SessionKeySerde.toBinary(sessionKey, this.innerStateSerde.keySerializer(), this.topic), null);
    }

    @Override
    public void put(Windowed<Bytes> sessionKey, byte[] aggregate) {
        this.bytesStore.put(sessionKey, aggregate);
        this.changeLogger.logChange(SessionKeySerde.bytesToBinary(sessionKey), aggregate);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes key) {
        return this.findSessions(key, 0L, Long.MAX_VALUE);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to) {
        return this.findSessions(from, to, 0L, Long.MAX_VALUE);
    }
}

