/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.sync;

import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
import org.apache.flink.state.forst.ForStNativeMetricMonitor;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.sync.ForStDBCachingPriorityQueueSet;
import org.apache.flink.state.forst.sync.TreeOrderedSetCache;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.ReadOptions;
import org.forstdb.RocksDB;

public class ForStDBPriorityQueueSetFactory
implements PriorityQueueSetFactory {
    private final int cacheSize;
    @Nonnull
    private final DataOutputSerializer sharedElementOutView;
    @Nonnull
    private final DataInputDeserializer sharedElementInView;
    private final KeyGroupRange keyGroupRange;
    private final int keyGroupPrefixBytes;
    private final int numberOfKeyGroups;
    private final Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;
    private final RocksDB db;
    private final ReadOptions readOptions;
    private final ForStDBWriteBatchWrapper writeBatchWrapper;
    private final ForStNativeMetricMonitor nativeMetricMonitor;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final Long writeBufferManagerCapacity;

    ForStDBPriorityQueueSetFactory(KeyGroupRange keyGroupRange, int keyGroupPrefixBytes, int numberOfKeyGroups, Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, RocksDB db, ReadOptions readOptions, ForStDBWriteBatchWrapper writeBatchWrapper, ForStNativeMetricMonitor nativeMetricMonitor, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, Long writeBufferManagerCapacity, int cacheSize) {
        this.keyGroupRange = keyGroupRange;
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.numberOfKeyGroups = numberOfKeyGroups;
        this.kvStateInformation = kvStateInformation;
        this.db = db;
        this.readOptions = readOptions;
        this.writeBatchWrapper = writeBatchWrapper;
        this.nativeMetricMonitor = nativeMetricMonitor;
        this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
        this.sharedElementOutView = new DataOutputSerializer(128);
        this.sharedElementInView = new DataInputDeserializer();
        this.writeBufferManagerCapacity = writeBufferManagerCapacity;
        Preconditions.checkArgument((cacheSize > 0 ? 1 : 0) != 0);
        this.cacheSize = cacheSize;
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.create(stateName, byteOrderedElementSerializer, false);
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, final @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        ForStOperationUtils.ForStKvStateInfo stateCFHandle = this.tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
        final ColumnFamilyHandle columnFamilyHandle = stateCFHandle.columnFamilyHandle;
        return new KeyGroupPartitionedPriorityQueue(KeyExtractorFunction.forKeyedObjects(), PriorityComparator.forPriorityComparableObjects(), new KeyGroupPartitionedPriorityQueue.PartitionQueueSetFactory<T, ForStDBCachingPriorityQueueSet<T>>(){

            @Nonnull
            public ForStDBCachingPriorityQueueSet<T> create(int keyGroupId, int numKeyGroups, @Nonnull KeyExtractorFunction<T> keyExtractor, @Nonnull PriorityComparator<T> elementPriorityComparator) {
                TreeOrderedSetCache orderedSetCache = new TreeOrderedSetCache(ForStDBPriorityQueueSetFactory.this.cacheSize);
                return new ForStDBCachingPriorityQueueSet(keyGroupId, ForStDBPriorityQueueSetFactory.this.keyGroupPrefixBytes, ForStDBPriorityQueueSetFactory.this.db, ForStDBPriorityQueueSetFactory.this.readOptions, columnFamilyHandle, byteOrderedElementSerializer, ForStDBPriorityQueueSetFactory.this.sharedElementOutView, ForStDBPriorityQueueSetFactory.this.sharedElementInView, ForStDBPriorityQueueSetFactory.this.writeBatchWrapper, orderedSetCache);
            }
        }, this.keyGroupRange, this.numberOfKeyGroups);
    }

    @Nonnull
    private <T> ForStOperationUtils.ForStKvStateInfo tryRegisterPriorityQueueMetaInfo(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        ForStOperationUtils.ForStKvStateInfo stateInfo = this.kvStateInformation.get(stateName);
        if (stateInfo == null) {
            RegisteredPriorityQueueStateBackendMetaInfo metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
            metaInfo = allowFutureMetadataUpdates ? metaInfo.withSerializerUpgradesAllowed() : metaInfo;
            stateInfo = ForStOperationUtils.createStateInfo((RegisteredStateMetaInfoBase)metaInfo, this.db, this.columnFamilyOptionsFactory, null, this.writeBufferManagerCapacity, ICloseableRegistry.NO_OP);
            ForStOperationUtils.registerKvStateInformation(this.kvStateInformation, this.nativeMetricMonitor, stateName, stateInfo);
        } else {
            RegisteredPriorityQueueStateBackendMetaInfo castedMetaInfo = (RegisteredPriorityQueueStateBackendMetaInfo)stateInfo.metaInfo;
            TypeSerializer previousElementSerializer = castedMetaInfo.getPreviousElementSerializer();
            if (previousElementSerializer != byteOrderedElementSerializer) {
                TypeSerializerSchemaCompatibility compatibilityResult = castedMetaInfo.updateElementSerializer(byteOrderedElementSerializer);
                if (compatibilityResult.isIncompatible()) {
                    throw new FlinkRuntimeException((Throwable)new StateMigrationException("The new priority queue serializer must not be incompatible."));
                }
                RegisteredPriorityQueueStateBackendMetaInfo metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateName, byteOrderedElementSerializer);
                metaInfo = allowFutureMetadataUpdates ? metaInfo.withSerializerUpgradesAllowed() : metaInfo;
                stateInfo = new ForStOperationUtils.ForStKvStateInfo(stateInfo.columnFamilyHandle, (RegisteredStateMetaInfoBase)metaInfo);
                this.kvStateInformation.put(stateName, stateInfo);
            }
        }
        return stateInfo;
    }

    @VisibleForTesting
    public int getCacheSize() {
        return this.cacheSize;
    }
}

