/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.api;

import java.io.Serializable;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.state.api.OperatorIdentifier;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.SavepointWriter;
import org.apache.flink.state.api.StateBootstrapTransformation;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public class SavepointWriterUidModificationITCase {
    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    private static final Collection<Integer> STATE_1 = Arrays.asList(1, 2, 3);
    private static final Collection<Integer> STATE_2 = Arrays.asList(4, 5, 6);
    private static final ValueStateDescriptor<Integer> STATE_DESCRIPTOR = new ValueStateDescriptor("number", Types.INT);

    @Test
    public void testAddUid(@TempDir Path tmp) throws Exception {
        String uidHash = new AbstractID().toHexString();
        String uid = "uid";
        String originalSavepoint = SavepointWriterUidModificationITCase.bootstrapState(tmp, (env, writer) -> writer.withOperator(OperatorIdentifier.forUidHash((String)uidHash), SavepointWriterUidModificationITCase.bootstrap(env, STATE_1)));
        String newSavepoint = SavepointWriterUidModificationITCase.modifySavepoint(tmp, originalSavepoint, writer -> writer.changeOperatorIdentifier(OperatorIdentifier.forUidHash((String)uidHash), OperatorIdentifier.forUid((String)"uid")));
        SavepointWriterUidModificationITCase.runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, "uid", null));
    }

    @Test
    public void testChangeUid(@TempDir Path tmp) throws Exception {
        String uid = "uid";
        String newUid = "fabulous";
        String originalSavepoint = SavepointWriterUidModificationITCase.bootstrapState(tmp, (env, writer) -> writer.withOperator(OperatorIdentifier.forUid((String)"uid"), SavepointWriterUidModificationITCase.bootstrap(env, STATE_1)));
        String newSavepoint = SavepointWriterUidModificationITCase.modifySavepoint(tmp, originalSavepoint, writer -> writer.changeOperatorIdentifier(OperatorIdentifier.forUid((String)"uid"), OperatorIdentifier.forUid((String)"fabulous")));
        SavepointWriterUidModificationITCase.runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, "fabulous", null));
    }

    @Test
    public void testChangeUidHashOnly(@TempDir Path tmp) throws Exception {
        String uid = "uid";
        String newUidHash = new AbstractID().toHexString();
        String originalSavepoint = SavepointWriterUidModificationITCase.bootstrapState(tmp, (env, writer) -> writer.withOperator(OperatorIdentifier.forUid((String)"uid"), SavepointWriterUidModificationITCase.bootstrap(env, STATE_1)));
        String newSavepoint = SavepointWriterUidModificationITCase.modifySavepoint(tmp, originalSavepoint, writer -> writer.changeOperatorIdentifier(OperatorIdentifier.forUid((String)"uid"), OperatorIdentifier.forUidHash((String)newUidHash)));
        SavepointWriterUidModificationITCase.runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, null, newUidHash));
    }

    @Test
    public void testSwapUid(@TempDir Path tmp) throws Exception {
        String uid1 = "uid1";
        String uid2 = "uid2";
        String originalSavepoint = SavepointWriterUidModificationITCase.bootstrapState(tmp, (env, writer) -> writer.withOperator(OperatorIdentifier.forUid((String)"uid1"), SavepointWriterUidModificationITCase.bootstrap(env, STATE_1)).withOperator(OperatorIdentifier.forUid((String)"uid2"), SavepointWriterUidModificationITCase.bootstrap(env, STATE_2)));
        String newSavepoint = SavepointWriterUidModificationITCase.modifySavepoint(tmp, originalSavepoint, writer -> writer.changeOperatorIdentifier(OperatorIdentifier.forUid((String)"uid1"), OperatorIdentifier.forUid((String)"uid2")).changeOperatorIdentifier(OperatorIdentifier.forUid((String)"uid2"), OperatorIdentifier.forUid((String)"uid1")));
        SavepointWriterUidModificationITCase.runAndValidate(newSavepoint, ValidationParameters.of(STATE_1, "uid2", null), ValidationParameters.of(STATE_2, "uid1", null));
    }

    private static String bootstrapState(Path tmp, BiConsumer<StreamExecutionEnvironment, SavepointWriter> mutator) throws Exception {
        String savepointPath = tmp.resolve(new AbstractID().toHexString()).toAbsolutePath().toString();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        SavepointWriter writer = SavepointWriter.newSavepoint((StreamExecutionEnvironment)env, (int)128);
        mutator.accept(env, writer);
        writer.write(savepointPath);
        env.execute("Bootstrap");
        return savepointPath;
    }

    private static StateBootstrapTransformation<Integer> bootstrap(StreamExecutionEnvironment env, Collection<Integer> data) {
        return OperatorTransformation.bootstrapWith((DataStream)env.fromData(data)).keyBy((KeySelector & Serializable)v -> v).transform((KeyedStateBootstrapFunction)new StateBootstrapper());
    }

    private static String modifySavepoint(Path tmp, String savepointPath, Consumer<SavepointWriter> mutator) throws Exception {
        String newSavepointPath = tmp.resolve(new AbstractID().toHexString()).toAbsolutePath().toString();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        SavepointWriter writer = SavepointWriter.fromExistingSavepoint((StreamExecutionEnvironment)env, (String)savepointPath);
        mutator.accept(writer);
        writer.write(newSavepointPath);
        env.execute("Modifying");
        return newSavepointPath;
    }

    private static void runAndValidate(String savepointPath, ValidationParameters ... validationParameters) throws Exception {
        CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata((String)savepointPath);
        Assertions.assertThat((int)metadata.getOperatorStates().size()).isEqualTo(validationParameters.length);
        for (ValidationParameters validationParameter : validationParameters) {
            Set operators;
            if (validationParameter.getUid() != null) {
                operators = metadata.getOperatorStates().stream().filter(os -> os.getOperatorUid().isPresent() && ((String)os.getOperatorUid().get()).equals(validationParameter.getUid())).collect(Collectors.toSet());
                Assertions.assertThat((int)operators.size()).isEqualTo(1);
                Assertions.assertThat((Comparable)((OperatorState)operators.iterator().next()).getOperatorID()).isEqualTo((Object)OperatorIdentifier.forUid((String)validationParameter.getUid()).getOperatorId());
                continue;
            }
            operators = metadata.getOperatorStates().stream().filter(os -> os.getOperatorID().toHexString().equals(validationParameter.getUidHash())).collect(Collectors.toSet());
            Assertions.assertThat((int)operators.size()).isEqualTo(1);
            Assertions.assertThat((Optional)((OperatorState)operators.iterator().next()).getOperatorUid()).isEmpty();
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        ArrayList<CloseableIterator> iterators = new ArrayList<CloseableIterator>();
        for (ValidationParameters validationParameter : validationParameters) {
            SingleOutputStreamOperator stream = env.fromData(validationParameter.getState()).keyBy((KeySelector & Serializable)v -> v).map((MapFunction)new StateReader());
            if (validationParameter.getUid() != null) {
                iterators.add(stream.uid(validationParameter.getUid()).collectAsync());
                continue;
            }
            iterators.add(stream.setUidHash(validationParameter.getUidHash()).collectAsync());
        }
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false));
        env.executeAsync(streamGraph);
        for (int i = 0; i < validationParameters.length; ++i) {
            Assertions.assertThat((Iterator)((Iterator)iterators.get(i))).toIterable().containsExactlyInAnyOrderElementsOf(validationParameters[i].getState());
        }
        for (CloseableIterator iterator : iterators) {
            iterator.close();
        }
    }

    private static class ValidationParameters {
        private final Collection<Integer> state;
        private final String uid;
        private final String uidHash;

        public ValidationParameters(Collection<Integer> state, String uid, String uidHash) {
            this.state = state;
            this.uid = uid;
            this.uidHash = uidHash;
        }

        public Collection<Integer> getState() {
            return this.state;
        }

        public String getUid() {
            return this.uid;
        }

        public String getUidHash() {
            return this.uidHash;
        }

        public static ValidationParameters of(Collection<Integer> state, String uid, String uidHash) {
            return new ValidationParameters(state, uid, uidHash);
        }
    }

    public static class StateBootstrapper
    extends KeyedStateBootstrapFunction<Integer, Integer> {
        private transient ValueState<Integer> state;

        public void open(OpenContext openContext) {
            this.state = this.getRuntimeContext().getState(STATE_DESCRIPTOR);
        }

        public void processElement(Integer value, KeyedStateBootstrapFunction.Context ctx) throws Exception {
            this.state.update((Object)value);
        }
    }

    public static class StateReader
    extends RichMapFunction<Integer, Integer> {
        private transient ValueState<Integer> state;

        public void open(OpenContext openContext) {
            this.state = this.getRuntimeContext().getState(STATE_DESCRIPTOR);
        }

        public Integer map(Integer value) throws Exception {
            return (Integer)this.state.value();
        }
    }
}

