/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.rocksdb.ttl;

import java.io.IOException;
import java.time.Duration;
import java.util.LinkedHashMap;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlUtils;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.FlinkCompactionFilter;
import org.rocksdb.InfoLogLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDbTtlCompactFiltersManager {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkCompactionFilter.class);
    private final TtlTimeProvider ttlTimeProvider;
    private final LinkedHashMap<String, FlinkCompactionFilter.FlinkCompactionFilterFactory> compactionFilterFactories;
    private final LinkedHashMap<String, ColumnFamilyOptions> columnFamilyOptionsMap;
    private final long queryTimeAfterNumEntries;
    private final Duration periodicCompactionTime;

    public RocksDbTtlCompactFiltersManager(TtlTimeProvider ttlTimeProvider, long queryTimeAfterNumEntries, Duration periodicCompactionTime) {
        this.ttlTimeProvider = ttlTimeProvider;
        this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
        this.periodicCompactionTime = periodicCompactionTime;
        this.compactionFilterFactories = new LinkedHashMap();
        this.columnFamilyOptionsMap = new LinkedHashMap();
    }

    public void setAndRegisterCompactFilterIfStateTtl(@Nonnull RegisteredStateMetaInfoBase metaInfoBase, @Nonnull ColumnFamilyOptions options) {
        RegisteredKeyValueStateBackendMetaInfo kvMetaInfoBase;
        if (metaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo && TtlStateFactory.TtlSerializer.isTtlStateSerializer((TypeSerializer)(kvMetaInfoBase = (RegisteredKeyValueStateBackendMetaInfo)metaInfoBase).getStateSerializer())) {
            this.createAndSetCompactFilterFactory(metaInfoBase.getName(), options);
        }
    }

    private void createAndSetCompactFilterFactory(String stateName, @Nonnull ColumnFamilyOptions options) {
        FlinkCompactionFilter.FlinkCompactionFilterFactory compactionFilterFactory = new FlinkCompactionFilter.FlinkCompactionFilterFactory((FlinkCompactionFilter.TimeProvider)new TimeProviderWrapper(this.ttlTimeProvider), RocksDbTtlCompactFiltersManager.createRocksDbNativeLogger());
        options.setCompactionFilterFactory((AbstractCompactionFilterFactory)compactionFilterFactory);
        this.compactionFilterFactories.put(stateName, compactionFilterFactory);
        this.columnFamilyOptionsMap.put(stateName, options);
    }

    private static org.rocksdb.Logger createRocksDbNativeLogger() {
        if (LOG.isDebugEnabled()) {
            try (DBOptions opts = new DBOptions().setInfoLogLevel(InfoLogLevel.DEBUG_LEVEL);){
                org.rocksdb.Logger logger = new org.rocksdb.Logger(opts){

                    protected void log(InfoLogLevel infoLogLevel, String logMsg) {
                        LOG.debug("RocksDB filter native code log: " + logMsg);
                    }
                };
                return logger;
            }
        }
        return null;
    }

    public void configCompactFilter(@Nonnull StateDescriptor<?, ?> stateDesc, TypeSerializer<?> stateSerializer) {
        StateTtlConfig ttlConfig = stateDesc.getTtlConfig();
        if (ttlConfig.isEnabled() && ttlConfig.getCleanupStrategies().inRocksdbCompactFilter()) {
            TypeSerializer elemSerializer;
            int len;
            FlinkCompactionFilter.FlinkCompactionFilterFactory compactionFilterFactory = this.compactionFilterFactories.get(stateDesc.getName());
            Preconditions.checkNotNull((Object)compactionFilterFactory);
            long ttl = ttlConfig.getTimeToLive().toMillis();
            ColumnFamilyOptions columnFamilyOptions = this.columnFamilyOptionsMap.get(stateDesc.getName());
            Preconditions.checkNotNull((Object)columnFamilyOptions);
            StateTtlConfig.RocksdbCompactFilterCleanupStrategy rocksdbCompactFilterCleanupStrategy = ttlConfig.getCleanupStrategies().getRocksdbCompactFilterCleanupStrategy();
            Duration periodicCompactionTime = this.periodicCompactionTime;
            long queryTimeAfterNumEntries = this.queryTimeAfterNumEntries;
            if (rocksdbCompactFilterCleanupStrategy != null) {
                periodicCompactionTime = rocksdbCompactFilterCleanupStrategy.getPeriodicCompactionTime();
                queryTimeAfterNumEntries = rocksdbCompactFilterCleanupStrategy.getQueryTimeAfterNumEntries();
            }
            if (periodicCompactionTime != null) {
                columnFamilyOptions.setPeriodicCompactionSeconds(periodicCompactionTime.getSeconds());
            }
            FlinkCompactionFilter.Config config = stateDesc instanceof ListStateDescriptor ? ((len = (elemSerializer = ((ListSerializer)stateSerializer).getElementSerializer()).getLength()) > 0 ? FlinkCompactionFilter.Config.createForFixedElementList((long)ttl, (long)queryTimeAfterNumEntries, (int)(len + 1)) : FlinkCompactionFilter.Config.createForList((long)ttl, (long)queryTimeAfterNumEntries, new ListElementFilterFactory(elemSerializer.duplicate()))) : (stateDesc instanceof MapStateDescriptor ? FlinkCompactionFilter.Config.createForMap((long)ttl, (long)queryTimeAfterNumEntries) : FlinkCompactionFilter.Config.createForValue((long)ttl, (long)queryTimeAfterNumEntries));
            compactionFilterFactory.configure(config);
        }
    }

    public void disposeAndClearRegisteredCompactionFactories() {
        for (FlinkCompactionFilter.FlinkCompactionFilterFactory factory : this.compactionFilterFactories.values()) {
            IOUtils.closeQuietly((AutoCloseable)factory);
        }
        this.compactionFilterFactories.clear();
        this.columnFamilyOptionsMap.clear();
    }

    private static class TimeProviderWrapper
    implements FlinkCompactionFilter.TimeProvider {
        private final TtlTimeProvider ttlTimeProvider;

        private TimeProviderWrapper(TtlTimeProvider ttlTimeProvider) {
            this.ttlTimeProvider = ttlTimeProvider;
        }

        public long currentTimestamp() {
            return this.ttlTimeProvider.currentTimestamp();
        }
    }

    private static class ListElementFilterFactory<T>
    implements FlinkCompactionFilter.ListElementFilterFactory {
        private final TypeSerializer<T> serializer;

        private ListElementFilterFactory(TypeSerializer<T> serializer) {
            this.serializer = serializer;
        }

        public FlinkCompactionFilter.ListElementFilter createListElementFilter() {
            return new ListElementFilter<T>(this.serializer);
        }
    }

    private static class ListElementFilter<T>
    implements FlinkCompactionFilter.ListElementFilter {
        private final TypeSerializer<T> serializer;
        private DataInputDeserializer input;

        private ListElementFilter(TypeSerializer<T> serializer) {
            this.serializer = serializer;
            this.input = new DataInputDeserializer();
        }

        public int nextUnexpiredOffset(byte[] bytes, long ttl, long currentTimestamp) {
            this.input.setBuffer(bytes);
            int lastElementOffset = 0;
            while (this.input.available() > 0) {
                try {
                    long timestamp = this.nextElementLastAccessTimestamp();
                    if (!TtlUtils.expired((long)timestamp, (long)ttl, (long)currentTimestamp)) break;
                    lastElementOffset = this.input.getPosition();
                }
                catch (IOException e) {
                    throw new FlinkRuntimeException("Failed to deserialize list element for TTL compaction filter", (Throwable)e);
                }
            }
            return lastElementOffset;
        }

        private long nextElementLastAccessTimestamp() throws IOException {
            TtlValue ttlValue = (TtlValue)this.serializer.deserialize((DataInputView)this.input);
            if (this.input.available() > 0) {
                this.input.skipBytesToRead(1);
            }
            return ttlValue.getLastAccessTimestamp();
        }
    }
}

