/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.SavepointReader;
import org.apache.flink.state.api.SavepointWriter;
import org.apache.flink.state.api.StateBootstrapTransformation;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.state.api.utils.JobResultRetriever;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SavepointDeepCopyTest
extends AbstractTestBaseJUnit4 {
    private static final MemorySize FILE_STATE_SIZE_THRESHOLD = new MemorySize(1L);
    private static final String TEXT = "The quick brown fox jumps over the lazy dog";
    private static final String RANDOM_VALUE = RandomStringUtils.randomAlphanumeric((int)120);
    private final StateBackend backend;

    public SavepointDeepCopyTest(StateBackend backend) throws Exception {
        this.backend = backend;
    }

    @Parameterized.Parameters(name="State Backend: {0}")
    public static Collection<StateBackend> data() {
        return Arrays.asList(new HashMapStateBackend(), new EmbeddedRocksDBStateBackend());
    }

    @Test
    public void testSavepointDeepCopy() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource words = env.fromData((Object[])TEXT.split(" "));
        StateBootstrapTransformation transformation = OperatorTransformation.bootstrapWith((DataStream)words).keyBy((KeySelector & Serializable)e -> e).transform((KeyedStateBootstrapFunction)new WordMapBootstrapper());
        File savepointUrl1 = this.createAndRegisterTempFile(new AbstractID().toHexString());
        String savepointPath1 = savepointUrl1.getPath();
        SavepointWriter.newSavepoint((StreamExecutionEnvironment)env, (StateBackend)this.backend, (int)128).withConfiguration(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)FILE_STATE_SIZE_THRESHOLD).withOperator(OperatorIdentifier.forUid((String)"Operator1"), transformation).write(savepointPath1);
        env.execute("bootstrap savepoint1");
        Set<String> stateFiles1 = SavepointDeepCopyTest.getFileNamesInDirectory(Paths.get(savepointPath1, new String[0]));
        Assert.assertTrue((String)"Failed to bootstrap savepoint1 with additional state files", (stateFiles1.size() > 1 ? 1 : 0) != 0);
        File savepointUrl2 = this.createAndRegisterTempFile(new AbstractID().toHexString());
        String savepointPath2 = savepointUrl2.getPath();
        SavepointWriter savepoint2 = SavepointWriter.fromExistingSavepoint((StreamExecutionEnvironment)env, (String)savepointPath1, (StateBackend)this.backend).withConfiguration(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)FILE_STATE_SIZE_THRESHOLD);
        savepoint2.withOperator(OperatorIdentifier.forUid((String)"Operator2"), transformation).write(savepointPath2);
        env.execute("create savepoint2");
        Set<String> stateFiles2 = SavepointDeepCopyTest.getFileNamesInDirectory(Paths.get(savepointPath1, new String[0]));
        Assert.assertTrue((String)"Failed to create savepoint2 from savepoint1 with additional state files", (stateFiles2.size() > 1 ? 1 : 0) != 0);
        Assert.assertThat((String)"At least one state file in savepoint1 are not in savepoint2", stateFiles1, (Matcher)Matchers.everyItem((Matcher)Matchers.isIn(stateFiles2)));
        long actuallyKeyNum = JobResultRetriever.collect(SavepointReader.read((StreamExecutionEnvironment)env, (String)savepointPath2, (StateBackend)this.backend).readKeyedState(OperatorIdentifier.forUid((String)"Operator1"), (KeyedStateReaderFunction)new ReadFunction())).size();
        long expectedKeyNum = Arrays.stream(TEXT.split(" ")).distinct().count();
        Assert.assertEquals((String)"Unexpected number of keys in the state of Operator1", (long)expectedKeyNum, (long)actuallyKeyNum);
    }

    private static Set<String> getFileNamesInDirectory(Path path) throws IOException {
        try (Stream<Path> files = Files.list(path);){
            Set<String> set = files.map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            return set;
        }
    }

    static class WordMapBootstrapper
    extends KeyedStateBootstrapFunction<String, String> {
        private ValueState<Tuple2<String, String>> state;

        WordMapBootstrapper() {
        }

        public void open(OpenContext openContext) {
            ValueStateDescriptor descriptor = new ValueStateDescriptor("state", Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.STRING, Types.STRING}));
            this.state = this.getRuntimeContext().getState(descriptor);
        }

        public void processElement(String value, KeyedStateBootstrapFunction.Context ctx) throws Exception {
            if (this.state.value() == null) {
                this.state.update((Object)new Tuple2((Object)value, (Object)RANDOM_VALUE));
            }
        }
    }

    static class ReadFunction
    extends KeyedStateReaderFunction<String, Tuple2<String, String>> {
        private ValueState<Tuple2<String, String>> state;

        ReadFunction() {
        }

        public void open(OpenContext openContext) {
            ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("state", Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.STRING, Types.STRING}));
            this.state = this.getRuntimeContext().getState(stateDescriptor);
        }

        public void readKey(String key, KeyedStateReaderFunction.Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
            out.collect((Object)((Tuple2)this.state.value()));
        }
    }
}

