/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.benchmark;

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackend;
import org.apache.flink.state.rocksdb.RocksDBKeyedStateBackendBuilder;
import org.apache.flink.state.rocksdb.RocksDBPriorityQueueConfig;
import org.apache.flink.state.rocksdb.RocksDBResourceContainer;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.rocksdb.RocksDBException;

public class StateBackendBenchmarkUtils {
    private static final String rootDirName = "benchmark";
    private static final String recoveryDirName = "localRecovery";
    private static final String dbDirName = "dbPath";
    private static final String checkpointDirName = "checkpointPath";
    private static final String changelogDirName = "changelogPath";
    private static File rootDir;

    public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType backendType, File baseDir, TtlTimeProvider ttlTimeProvider) throws IOException {
        switch (backendType) {
            case HEAP: {
                rootDir = StateBackendBenchmarkUtils.prepareDirectory(rootDirName, baseDir);
                return StateBackendBenchmarkUtils.createHeapKeyedStateBackend(rootDir, ttlTimeProvider);
            }
            case ROCKSDB: {
                rootDir = StateBackendBenchmarkUtils.prepareDirectory(rootDirName, baseDir);
                return StateBackendBenchmarkUtils.createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider);
            }
            case HEAP_CHANGELOG: {
                rootDir = StateBackendBenchmarkUtils.prepareDirectory(rootDirName, baseDir);
                return StateBackendBenchmarkUtils.createChangelogKeyedStateBackend(StateBackendBenchmarkUtils.createHeapKeyedStateBackend(rootDir, ttlTimeProvider));
            }
            case ROCKSDB_CHANGELOG: {
                rootDir = StateBackendBenchmarkUtils.prepareDirectory(rootDirName, baseDir);
                return StateBackendBenchmarkUtils.createChangelogKeyedStateBackend(StateBackendBenchmarkUtils.createRocksDBKeyedStateBackend(rootDir, ttlTimeProvider));
            }
            case BATCH_EXECUTION: {
                return StateBackendBenchmarkUtils.createBatchExecutionStateBackend(ttlTimeProvider);
            }
        }
        throw new IllegalArgumentException("Unknown backend type: " + backendType);
    }

    public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType backendType, File baseDir) throws IOException {
        return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, baseDir, TtlTimeProvider.DEFAULT);
    }

    public static KeyedStateBackend<Long> createKeyedStateBackend(StateBackendType backendType) throws IOException {
        return StateBackendBenchmarkUtils.createKeyedStateBackend(backendType, null);
    }

    private static CheckpointableKeyedStateBackend<Long> createBatchExecutionStateBackend(TtlTimeProvider ttlTimeProvider) {
        MockEnvironment env = MockEnvironment.builder().build();
        JobID jobID = new JobID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        return new BatchExecutionStateBackend().createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)env, jobID, "Test", (TypeSerializer)new LongSerializer(), 2, keyGroupRange, null, ttlTimeProvider, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), null));
    }

    private static ChangelogKeyedStateBackend<Long> createChangelogKeyedStateBackend(AbstractKeyedStateBackend<Long> delegatedKeyedStateBackend) throws IOException {
        File cpPathFile = StateBackendBenchmarkUtils.prepareDirectory(checkpointDirName, rootDir);
        File changelogPathFile = StateBackendBenchmarkUtils.prepareDirectory(changelogDirName, rootDir);
        return new ChangelogKeyedStateBackend(delegatedKeyedStateBackend, "Test", new ExecutionConfig(), TtlTimeProvider.DEFAULT, (MetricGroup)UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(), ((StateChangelogStorage)Preconditions.checkNotNull((Object)StateChangelogStorageLoader.load((JobID)JobID.generate(), (Configuration)new Configuration().set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, (Object)"filesystem").set(FsStateChangelogOptions.BASE_PATH, (Object)changelogPathFile.getPath()), (TaskManagerJobMetricGroup)UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), (LocalRecoveryConfig)TestLocalRecoveryConfig.disabled()))).createWriter("test", KeyGroupRange.EMPTY_KEY_GROUP_RANGE, null), Collections.emptyList(), (CheckpointStorageWorkerView)new FsCheckpointStorageAccess(new Path(cpPathFile.getPath()), null, new JobID(), 1024, 4096));
    }

    private static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend(File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException {
        File recoveryBaseDir = StateBackendBenchmarkUtils.prepareDirectory(recoveryDirName, rootDir);
        File dbPathFile = StateBackendBenchmarkUtils.prepareDirectory(dbDirName, rootDir);
        ExecutionConfig executionConfig = new ExecutionConfig();
        RocksDBResourceContainer resourceContainer = new RocksDBResourceContainer();
        RocksDBKeyedStateBackendBuilder builder = new RocksDBKeyedStateBackendBuilder("Test", Thread.currentThread().getContextClassLoader(), dbPathFile, resourceContainer, stateName -> resourceContainer.getColumnOptions(), null, (TypeSerializer)LongSerializer.INSTANCE, 2, new KeyGroupRange(0, 1), executionConfig, LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED, RocksDBPriorityQueueConfig.buildWithPriorityQueueType((EmbeddedRocksDBStateBackend.PriorityQueueStateType)EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB), ttlTimeProvider, LatencyTrackingStateConfig.disabled(), (MetricGroup)new UnregisteredMetricsGroup(), (key, value) -> {}, Collections.emptyList(), AbstractStateBackend.getCompressionDecorator((ExecutionConfig)executionConfig), new CloseableRegistry());
        try {
            return builder.build();
        }
        catch (Exception e) {
            IOUtils.closeQuietly((AutoCloseable)resourceContainer);
            throw e;
        }
    }

    private static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend(File rootDir, TtlTimeProvider ttlTimeProvider) throws IOException {
        File recoveryBaseDir = StateBackendBenchmarkUtils.prepareDirectory(recoveryDirName, rootDir);
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups();
        ExecutionConfig executionConfig = new ExecutionConfig();
        HeapPriorityQueueSetFactory priorityQueueSetFactory = new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
        HeapKeyedStateBackendBuilder backendBuilder = new HeapKeyedStateBackendBuilder(null, (TypeSerializer)new LongSerializer(), Thread.currentThread().getContextClassLoader(), numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, LatencyTrackingStateConfig.disabled(), Collections.emptyList(), AbstractStateBackend.getCompressionDecorator((ExecutionConfig)executionConfig), LocalRecoveryConfig.BACKUP_AND_RECOVERY_DISABLED, priorityQueueSetFactory, false, new CloseableRegistry());
        return backendBuilder.build();
    }

    public static File prepareDirectory(String prefix, File parentDir) throws IOException {
        File target = File.createTempFile(prefix, "", parentDir);
        if (target.exists() && !target.delete()) {
            throw new IOException("Target dir {" + target.getAbsolutePath() + "} exists but failed to clean it up");
        }
        if (!target.mkdirs()) {
            throw new IOException("Failed to create target directory: " + target.getAbsolutePath());
        }
        return target;
    }

    public static <T> ValueState<T> getValueState(KeyedStateBackend<T> backend, ValueStateDescriptor<T> stateDescriptor) throws Exception {
        return (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    public static <T> ListState<T> getListState(KeyedStateBackend<T> backend, ListStateDescriptor<T> stateDescriptor) throws Exception {
        return (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    public static <K, V> MapState<K, V> getMapState(KeyedStateBackend<K> backend, MapStateDescriptor<K, V> stateDescriptor) throws Exception {
        return (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    public static <K, S extends State, T> void applyToAllKeys(KeyedStateBackend<K> backend, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        backend.applyToAllKeys((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor, function);
    }

    public static <K, S extends State, T> boolean compactState(KeyedStateBackend<K> backend, StateDescriptor<S, T> stateDescriptor) throws RocksDBException {
        if (!(backend instanceof RocksDBKeyedStateBackend)) {
            return false;
        }
        ((RocksDBKeyedStateBackend)backend).compactState(stateDescriptor);
        return true;
    }

    public static void cleanUp(KeyedStateBackend<?> backend) throws IOException {
        backend.dispose();
        if (rootDir != null) {
            Path path = Path.fromLocalFile((File)rootDir);
            path.getFileSystem().delete(path, true);
        }
    }

    public static enum StateBackendType {
        HEAP,
        ROCKSDB,
        HEAP_CHANGELOG,
        ROCKSDB_CHANGELOG,
        BATCH_EXECUTION;

    }
}

