/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.util.Optional;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageFactory;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.HamcrestCondition;
import org.assertj.core.api.ObjectAssert;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CheckpointStorageLoaderTest {
    private final Logger LOG = LoggerFactory.getLogger(CheckpointStorageLoaderTest.class);
    @TempDir
    private java.nio.file.Path tmp;
    private final ClassLoader cl = this.getClass().getClassLoader();

    CheckpointStorageLoaderTest() {
    }

    @Test
    void testNoCheckpointStorageDefined() throws Exception {
        Assertions.assertThat((Optional)CheckpointStorageLoader.fromConfig((ReadableConfig)new Configuration(), (ClassLoader)this.cl, null)).isNotPresent();
    }

    @Test
    void testLegacyStateBackendTakesPrecedence() throws Exception {
        LegacyStateBackend legacy = new LegacyStateBackend();
        MockStorage storage = new MockStorage();
        CheckpointStorage configured = CheckpointStorageLoader.load((CheckpointStorage)storage, (StateBackend)legacy, (Configuration)new Configuration(), (Configuration)new Configuration(), (ClassLoader)this.cl, (Logger)this.LOG);
        ((ObjectAssert)Assertions.assertThat((Object)configured).withFailMessage("Legacy state backends should always take precedence", new Object[0])).isEqualTo((Object)legacy);
    }

    @Test
    void testModernStateBackendDoesNotTakePrecedence() throws Exception {
        ModernStateBackend modern = new ModernStateBackend();
        MockStorage storage = new MockStorage();
        CheckpointStorage configured = CheckpointStorageLoader.load((CheckpointStorage)storage, (StateBackend)modern, (Configuration)new Configuration(), (Configuration)new Configuration(), (ClassLoader)this.cl, (Logger)this.LOG);
        ((ObjectAssert)Assertions.assertThat((Object)configured).withFailMessage("Modern state backends should never take precedence", new Object[0])).isEqualTo((Object)storage);
    }

    @Test
    void testLoadingFromFactory() throws Exception {
        Configuration jobConfig = new Configuration();
        Configuration clusterConfig = new Configuration();
        jobConfig.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)WorkingFactory.class.getName());
        clusterConfig.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        CheckpointStorage storage1 = CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)jobConfig, (Configuration)clusterConfig, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)storage1).isInstanceOf(MockStorage.class);
        CheckpointStorage storage2 = CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)clusterConfig, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)storage2).isInstanceOf(JobManagerCheckpointStorage.class);
    }

    @Test
    void testDefaultCheckpointStorage() throws Exception {
        CheckpointStorage storage1 = CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)new Configuration(), (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)storage1).isInstanceOf(JobManagerCheckpointStorage.class);
        String checkpointDir1 = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        String checkpointDir2 = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        Configuration jobConfig = new Configuration();
        Configuration clusterConfig = new Configuration();
        jobConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir1);
        clusterConfig.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir2);
        CheckpointStorage storage2 = CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)jobConfig, (Configuration)clusterConfig, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)((FileSystemCheckpointStorage)storage2).getCheckpointPath()).isEqualTo((Object)new Path((String)jobConfig.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY)));
        CheckpointStorage storage3 = CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)clusterConfig, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)((FileSystemCheckpointStorage)storage3).getCheckpointPath()).isEqualTo((Object)new Path((String)clusterConfig.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY)));
    }

    @Test
    void testLoadingFails() throws Exception {
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"does.not.exist");
        Assertions.assertThatThrownBy(() -> CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)config, (ClassLoader)this.cl, (Logger)this.LOG)).isInstanceOf(DynamicCodeLoadingException.class);
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)File.class.getName());
        Assertions.assertThatThrownBy(() -> CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)config, (ClassLoader)this.cl, (Logger)this.LOG)).isInstanceOf(DynamicCodeLoadingException.class);
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)FailingFactory.class.getName());
        Assertions.assertThatThrownBy(() -> CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)config, (ClassLoader)this.cl, (Logger)this.LOG)).isInstanceOf(IllegalConfigurationException.class);
    }

    @Test
    void testLoadJobManagerStorageNoParameters() throws Exception {
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        CheckpointStorage storage = (CheckpointStorage)CheckpointStorageLoader.fromConfig((ReadableConfig)config, (ClassLoader)this.cl, null).get();
        Assertions.assertThat((Object)storage).isInstanceOf(JobManagerCheckpointStorage.class);
    }

    @Test
    void testLoadJobManagerStorageWithParameters() throws Exception {
        String savepointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        Path expectedSavepointPath = new Path(savepointDir);
        Configuration config1 = new Configuration();
        config1.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        config1.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        CheckpointStorage storage1 = (CheckpointStorage)CheckpointStorageLoader.fromConfig((ReadableConfig)config1, (ClassLoader)this.cl, null).get();
        Assertions.assertThat((Object)storage1).isInstanceOf(JobManagerCheckpointStorage.class);
        Assertions.assertThat((Object)((JobManagerCheckpointStorage)storage1).getSavepointPath()).isEqualTo((Object)expectedSavepointPath);
    }

    @Test
    void testConfigureJobManagerStorage() throws Exception {
        String savepointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        Path expectedSavepointPath = new Path(savepointDir);
        int maxSize = 100;
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"filesystem");
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        CheckpointStorage storage1 = CheckpointStorageLoader.load((CheckpointStorage)new JobManagerCheckpointStorage(100), (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)config, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)storage1).isInstanceOf(JobManagerCheckpointStorage.class);
        JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage)storage1;
        Assertions.assertThat((Object)jmStorage.getSavepointPath()).is((Condition)HamcrestCondition.matching(CheckpointStorageLoaderTest.normalizedPath(expectedSavepointPath)));
        Assertions.assertThat((int)jmStorage.getMaxStateSize()).isEqualTo(100);
    }

    @Test
    void testConfigureJobManagerStorageWithParameters() throws Exception {
        String savepointDirCluster = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        String savepointDirJob = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        Configuration clusterConfig = new Configuration();
        Configuration jobConfig = new Configuration();
        clusterConfig.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDirCluster);
        jobConfig.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDirJob);
        CheckpointStorage storage = CheckpointStorageLoader.load((CheckpointStorage)new JobManagerCheckpointStorage(), (StateBackend)new ModernStateBackend(), (Configuration)jobConfig, (Configuration)clusterConfig, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)storage).isInstanceOf(JobManagerCheckpointStorage.class);
        JobManagerCheckpointStorage jmStorage = (JobManagerCheckpointStorage)storage;
        Assertions.assertThat((Object)jmStorage.getSavepointPath()).isEqualTo((Object)new Path(savepointDirCluster));
    }

    @Test
    void testLoadFileSystemCheckpointStorage() throws Exception {
        String checkpointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        String savepointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        Path expectedCheckpointsPath = new Path(checkpointDir);
        Path expectedSavepointsPath = new Path(savepointDir);
        MemorySize threshold = MemorySize.parse((String)"900kb");
        int minWriteBufferSize = 1024;
        Configuration config1 = new Configuration();
        config1.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"filesystem");
        config1.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir);
        config1.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)threshold);
        config1.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, (Object)1024);
        CheckpointStorage storage1 = (CheckpointStorage)CheckpointStorageLoader.fromConfig((ReadableConfig)config1, (ClassLoader)this.cl, null).get();
        Assertions.assertThat((Object)storage1).isInstanceOf(FileSystemCheckpointStorage.class);
        FileSystemCheckpointStorage fs1 = (FileSystemCheckpointStorage)storage1;
        Assertions.assertThat((Object)fs1.getCheckpointPath()).is((Condition)HamcrestCondition.matching(CheckpointStorageLoaderTest.normalizedPath(expectedCheckpointsPath)));
        Assertions.assertThat((Object)fs1.getSavepointPath()).is((Condition)HamcrestCondition.matching(CheckpointStorageLoaderTest.normalizedPath(expectedSavepointsPath)));
        Assertions.assertThat((int)fs1.getMinFileSizeThreshold()).isEqualTo(threshold.getBytes());
        Assertions.assertThat((int)fs1.getWriteBufferSize()).isEqualTo(Math.max(threshold.getBytes(), 1024L));
    }

    @Test
    void testLoadFileSystemCheckpointStorageMixed() throws Exception {
        Path appCheckpointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI());
        String checkpointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        String savepointDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        Path expectedSavepointsPath = new Path(savepointDir);
        int threshold = 1000000;
        int writeBufferSize = 4000000;
        FileSystemCheckpointStorage storage = new FileSystemCheckpointStorage(appCheckpointDir, 1000000, 4000000);
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir);
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir);
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.parse((String)"20"));
        config.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, (Object)3000000);
        CheckpointStorage loadedStorage1 = CheckpointStorageLoader.load((CheckpointStorage)storage, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)config, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)loadedStorage1).isInstanceOf(FileSystemCheckpointStorage.class);
        FileSystemCheckpointStorage fs1 = (FileSystemCheckpointStorage)loadedStorage1;
        Assertions.assertThat((Object)fs1.getCheckpointPath()).is((Condition)HamcrestCondition.matching(CheckpointStorageLoaderTest.normalizedPath(appCheckpointDir)));
        Assertions.assertThat((Object)fs1.getSavepointPath()).is((Condition)HamcrestCondition.matching(CheckpointStorageLoaderTest.normalizedPath(expectedSavepointsPath)));
        Assertions.assertThat((int)fs1.getMinFileSizeThreshold()).isEqualTo(1000000);
        Assertions.assertThat((int)fs1.getWriteBufferSize()).isEqualTo(4000000);
    }

    @Test
    void testHighAvailabilityDefault() throws Exception {
        String haPersistenceDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI()).toString();
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, null);
        Path checkpointPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI().toString());
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, checkpointPath);
    }

    @Test
    void testHighAvailabilityDefaultLocalPaths() throws Exception {
        String haPersistenceDir = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).getAbsolutePath()).toString();
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, null);
        Path checkpointPath = new Path(TempDirUtils.newFolder((java.nio.file.Path)this.tmp).toURI().toString()).makeQualified(FileSystem.getLocalFileSystem());
        this.testMemoryBackendHighAvailabilityDefault(haPersistenceDir, checkpointPath);
    }

    private void testMemoryBackendHighAvailabilityDefault(String haPersistenceDir, Path checkpointPath) throws Exception {
        Configuration config1 = new Configuration();
        config1.set(HighAvailabilityOptions.HA_MODE, (Object)"zookeeper");
        config1.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)"myCluster");
        config1.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)haPersistenceDir);
        Configuration config2 = new Configuration();
        config2.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"jobmanager");
        config2.set(HighAvailabilityOptions.HA_MODE, (Object)"zookeeper");
        config2.set(HighAvailabilityOptions.HA_CLUSTER_ID, (Object)"myCluster");
        config2.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)haPersistenceDir);
        if (checkpointPath != null) {
            config1.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointPath.toUri().toString());
            config2.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointPath.toUri().toString());
        }
        JobManagerCheckpointStorage storage = new JobManagerCheckpointStorage();
        CheckpointStorage loaded1 = CheckpointStorageLoader.load((CheckpointStorage)storage, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)config1, (ClassLoader)this.cl, (Logger)this.LOG);
        CheckpointStorage loaded2 = CheckpointStorageLoader.load(null, (StateBackend)new ModernStateBackend(), (Configuration)new Configuration(), (Configuration)config2, (ClassLoader)this.cl, (Logger)this.LOG);
        Assertions.assertThat((Object)loaded1).isInstanceOf(JobManagerCheckpointStorage.class);
        Assertions.assertThat((Object)loaded2).isInstanceOf(JobManagerCheckpointStorage.class);
        JobManagerCheckpointStorage memStorage1 = (JobManagerCheckpointStorage)loaded1;
        JobManagerCheckpointStorage memStorage2 = (JobManagerCheckpointStorage)loaded2;
        Assertions.assertThat((Object)memStorage1.getSavepointPath()).isNull();
        Assertions.assertThat((Object)memStorage2.getSavepointPath()).isNull();
        if (checkpointPath != null) {
            Assertions.assertThat((Object)memStorage1.getCheckpointPath()).is((Condition)HamcrestCondition.matching(CheckpointStorageLoaderTest.normalizedPath(checkpointPath)));
            Assertions.assertThat((Object)memStorage2.getCheckpointPath()).is((Condition)HamcrestCondition.matching(CheckpointStorageLoaderTest.normalizedPath(checkpointPath)));
        } else {
            Assertions.assertThat((Object)memStorage1.getCheckpointPath()).isNull();
            Assertions.assertThat((Object)memStorage2.getCheckpointPath()).isNull();
        }
    }

    private static Matcher<Path> normalizedPath(Path expected) {
        return new NormalizedPathMatcher(expected);
    }

    static final class LegacyStateBackend
    implements StateBackend,
    CheckpointStorage {
        LegacyStateBackend() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return null;
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
            return null;
        }
    }

    static final class MockStorage
    implements CheckpointStorage {
        MockStorage() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return null;
        }
    }

    static final class ModernStateBackend
    implements StateBackend {
        ModernStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> parameters) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters parameters) throws Exception {
            return null;
        }
    }

    static final class WorkingFactory
    implements CheckpointStorageFactory<MockStorage> {
        WorkingFactory() {
        }

        public MockStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
            return new MockStorage();
        }
    }

    static final class FailingFactory
    implements CheckpointStorageFactory<CheckpointStorage> {
        FailingFactory() {
        }

        public CheckpointStorage createFromConfig(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
            throw new IllegalConfigurationException("fail!");
        }
    }

    private static class NormalizedPathMatcher
    extends TypeSafeMatcher<Path> {
        private final Path reNormalizedExpected;

        private NormalizedPathMatcher(Path expected) {
            this.reNormalizedExpected = expected == null ? null : new Path(expected.toString());
        }

        protected boolean matchesSafely(Path actual) {
            if (this.reNormalizedExpected == null) {
                return actual == null;
            }
            Path reNormalizedActual = new Path(actual.toString());
            return this.reNormalizedExpected.equals((Object)reNormalizedActual);
        }

        public void describeTo(Description description) {
            description.appendValue((Object)this.reNormalizedExpected);
        }
    }
}

