/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.queryablestate.itcases;

import com.esotericsoftware.kryo.Serializer;
import java.io.File;
import java.io.IOException;
import java.net.URLClassLoader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.QueryableStateStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public abstract class AbstractQueryableStateTestBase {
    private static final Duration TEST_TIMEOUT = Duration.ofSeconds(200L);
    private static final long RETRY_TIMEOUT = 50L;
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = new TestExecutorExtension(() -> Executors.newScheduledThreadPool(4));
    private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
    private StreamExecutionEnvironment env;
    protected static QueryableStateClient client;
    protected static ClusterClient<?> clusterClient;
    protected static int maxParallelism;
    @TempDir
    static File classloaderFolder;

    @BeforeEach
    void setUp() throws Exception {
        this.env = this.createEnv();
        Assertions.assertThat(clusterClient).isNotNull();
        maxParallelism = 4;
    }

    protected abstract StreamExecutionEnvironment createEnv() throws Exception;

    @Test
    void testQueryableState() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        int numKeys = 256;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestKeyRangeSource(256));
        ReducingStateDescriptor reducingState = new ReducingStateDescriptor("any-name", (ReduceFunction)new SumReduce(), source.getType());
        String queryName = "hakuna-matata";
        source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 7143749578983540352L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("hakuna-matata", reducingState);
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            int i;
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            AtomicLongArray counts = new AtomicLongArray(256);
            ArrayList futures = new ArrayList(256);
            boolean allNonZero = false;
            while (!allNonZero && deadline.hasTimeLeft()) {
                allNonZero = true;
                futures.clear();
                for (i = 0; i < 256; ++i) {
                    int key = i;
                    if (counts.get(key) > 0L) continue;
                    allNonZero = false;
                    CompletableFuture result = AbstractQueryableStateTestBase.getKvState(deadline, client, jobId, "hakuna-matata", key, BasicTypeInfo.INT_TYPE_INFO, reducingState, false, this.executor);
                    result.thenAccept(response -> Assertions.assertThatCode(() -> {
                        Tuple2 res = (Tuple2)response.get();
                        counts.set(key, (Long)res.f1);
                        Assertions.assertThat((int)key).isEqualTo(((Integer)res.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                    }).doesNotThrowAnyException());
                    futures.add(result);
                }
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            }
            Assertions.assertThat((boolean)allNonZero).isTrue().withFailMessage("Not all keys are non-zero", new Object[0]);
            for (i = 0; i < 256; ++i) {
                long count = counts.get(i);
                Assertions.assertThat((long)count).isGreaterThan(0L).withFailMessage("Count at position " + i + " is " + count, new Object[0]);
            }
        }
    }

    @Test
    void testDuplicateRegistrationFailsJob() throws Exception {
        int numKeys = 256;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestKeyRangeSource(256));
        ReducingStateDescriptor reducingState = new ReducingStateDescriptor("any-name", (ReduceFunction)new SumReduce(), source.getType());
        String queryName = "duplicate-me";
        QueryableStateStream queryableState = source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = -4126824763829132959L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("duplicate-me", reducingState);
        QueryableStateStream duplicate = source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = -6265024000462809436L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("duplicate-me");
        JobGraph jobGraph = this.env.getStreamGraph().getJobGraph();
        ((CompletableFuture)((CompletableFuture)((CompletableFuture)clusterClient.submitJob((ExecutionPlan)jobGraph).thenCompose(arg_0 -> clusterClient.requestJobResult(arg_0))).thenApply(JobResult::getSerializedThrowable)).thenAccept(serializedThrowable -> {
            Assertions.assertThat((Optional)serializedThrowable).isPresent();
            Throwable t = ((SerializedThrowable)serializedThrowable.get()).deserializeError(this.getClass().getClassLoader());
            String failureCause = ExceptionUtils.stringifyException((Throwable)t);
            Assertions.assertThat((String)failureCause).contains(new CharSequence[]{"KvState with name 'duplicate-me' has already been registered by another operator"});
        })).get();
    }

    @Test
    void testValueState() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        ValueStateDescriptor valueState = new ValueStateDescriptor("any", source.getType());
        source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 7662520075515707428L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("hakuna", valueState);
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            this.executeValueQuery(deadline, client, jobId, "hakuna", (ValueStateDescriptor<Tuple2<Integer, Long>>)valueState, 1024L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCustomKryoSerializerHandling() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1L;
        String stateName = "teriberka";
        String customSerializerClassName = "CustomKryo";
        URLClassLoader userClassLoader = AbstractQueryableStateTestBase.createLoaderWithCustomKryoSerializer("CustomKryo");
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        Class<?> customSerializerClass = userClassLoader.loadClass("CustomKryo");
        ((SerializerConfigImpl)this.env.getConfig().getSerializerConfig()).addDefaultKryoSerializer(Byte.class, customSerializerClass);
        ValueStateDescriptor valueState = new ValueStateDescriptor("any", (TypeInformation)new GenericTypeInfo(Tuple2.class));
        this.env.addSource((SourceFunction)new TestAscendingValueSource(1L)).keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 7662520075515707428L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("teriberka", valueState);
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            jobGraph.setClasspaths(Arrays.asList(userClassLoader.getURLs()));
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            try {
                client.setUserClassLoader((ClassLoader)userClassLoader);
                this.executeValueQuery(deadline, client, jobId, "teriberka", (ValueStateDescriptor<Tuple2<Integer, Long>>)valueState, 1L);
            }
            finally {
                client.setUserClassLoader(null);
            }
        }
    }

    <T extends Serializer<Byte>> T createSerializer(ClassLoader userClassLoader) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<?> customSerializerClass = userClassLoader.loadClass("CustomKryo");
        return (T)((Serializer)customSerializerClass.newInstance());
    }

    @Test
    @Disabled
    void testWrongJobIdAndWrongQueryableStateName() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        ValueStateDescriptor valueState = new ValueStateDescriptor("any", source.getType());
        source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 7662520075515707428L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("hakuna", valueState);
        try (AutoCancellableJob closableJobGraph = new AutoCancellableJob(deadline, clusterClient, this.env);){
            clusterClient.submitJob((ExecutionPlan)closableJobGraph.getJobGraph()).get();
            CompletableFuture jobStatusFuture = clusterClient.getJobStatus(closableJobGraph.getJobId());
            while (deadline.hasTimeLeft() && !((JobStatus)jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).equals((Object)JobStatus.RUNNING)) {
                Thread.sleep(50L);
                jobStatusFuture = clusterClient.getJobStatus(closableJobGraph.getJobId());
            }
            Assertions.assertThat((Comparable)((JobStatus)jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS))).isEqualTo((Object)JobStatus.RUNNING);
            JobID wrongJobId = new JobID();
            CompletableFuture unknownJobFuture = client.getKvState(wrongJobId, "hakuna", (Object)0, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (StateDescriptor)valueState);
            ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> unknownJobFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).isInstanceOf(ExecutionException.class)).cause().isInstanceOf(RuntimeException.class)).hasMessage("FlinkJobNotFoundException: Could not find Flink job (" + wrongJobId + ")");
            CompletableFuture unknownQSName = client.getKvState(closableJobGraph.getJobId(), "wrong-hakuna", (Object)0, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (StateDescriptor)valueState);
            ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> unknownQSName.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).isInstanceOf(ExecutionException.class)).cause().isInstanceOf(RuntimeException.class)).hasMessage("UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'wrong-hakuna'.");
        }
    }

    @Test
    void testQueryNonStartedJobState() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        ValueStateDescriptor valueState = new ValueStateDescriptor("any", source.getType(), null);
        QueryableStateStream queryableState = source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 7480503339992214681L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("hakuna", valueState);
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            long expected = 1024L;
            client.getKvState(autoCancellableJob.getJobId(), queryableState.getQueryableStateName(), (Object)0, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (StateDescriptor)valueState);
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            this.executeValueQuery(deadline, client, jobId, "hakuna", (ValueStateDescriptor<Tuple2<Integer, Long>>)valueState, expected);
        }
    }

    @Test
    void testValueStateDefault() throws Throwable {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        ValueStateDescriptor valueState = new ValueStateDescriptor("any", source.getType(), (Object)Tuple2.of((Object)0, (Object)1337L));
        QueryableStateStream queryableState = source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 4509274556892655887L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return 1;
            }
        }).asQueryableState("hakuna", valueState);
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            int key = 0;
            CompletableFuture future = AbstractQueryableStateTestBase.getKvState(deadline, client, jobId, queryableState.getQueryableStateName(), key, BasicTypeInfo.INT_TYPE_INFO, valueState, true, this.executor);
            Assertions.assertThatThrownBy(() -> future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).cause().isInstanceOf(UnknownKeyOrNamespaceException.class);
        }
    }

    @Test
    void testValueStateShortcut() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        QueryableStateStream queryableState = source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 9168901838808830068L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("matata");
        ValueStateDescriptor stateDesc = (ValueStateDescriptor)queryableState.getStateDescriptor();
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            this.executeValueQuery(deadline, client, jobId, "matata", (ValueStateDescriptor<Tuple2<Integer, Long>>)stateDesc, 1024L);
        }
    }

    @Test
    void testReducingState() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        ReducingStateDescriptor reducingState = new ReducingStateDescriptor("any", (ReduceFunction)new SumReduce(), source.getType());
        source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).asQueryableState("jungle", reducingState);
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            long expected = 524800L;
            for (int key = 0; key < maxParallelism; ++key) {
                boolean success = false;
                while (deadline.hasTimeLeft() && !success) {
                    CompletableFuture future = AbstractQueryableStateTestBase.getKvState(deadline, client, jobId, "jungle", key, BasicTypeInfo.INT_TYPE_INFO, reducingState, false, this.executor);
                    Tuple2 value = (Tuple2)((ReducingState)future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get();
                    Assertions.assertThat((int)key).isEqualTo(((Integer)value.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                    if (524800L == (Long)value.f1) {
                        success = true;
                        continue;
                    }
                    Thread.sleep(50L);
                }
                Assertions.assertThat((boolean)success).isTrue().withFailMessage("Did not succeed query", new Object[0]);
            }
        }
    }

    @Test
    void testMapState() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        final MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("timon", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, source.getType());
        mapStateDescriptor.setQueryable("timon-queryable");
        source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).process((KeyedProcessFunction)new KeyedProcessFunction<Integer, Tuple2<Integer, Long>, Object>(){
            private static final long serialVersionUID = -805125545438296619L;
            private transient MapState<Integer, Tuple2<Integer, Long>> mapState;

            public void open(OpenContext openContext) throws Exception {
                super.open(openContext);
                this.mapState = this.getRuntimeContext().getMapState(mapStateDescriptor);
            }

            public void processElement(Tuple2<Integer, Long> value, KeyedProcessFunction.Context ctx, Collector<Object> out) throws Exception {
                Tuple2 v = (Tuple2)this.mapState.get((Object)((Integer)value.f0));
                if (v == null) {
                    v = new Tuple2((Object)((Integer)value.f0), (Object)0L);
                }
                this.mapState.put((Object)((Integer)value.f0), (Object)new Tuple2((Object)((Integer)v.f0), (Object)((Long)v.f1 + (Long)value.f1)));
            }
        });
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            long expected = 524800L;
            for (int key = 0; key < maxParallelism; ++key) {
                boolean success = false;
                while (deadline.hasTimeLeft() && !success) {
                    CompletableFuture future = AbstractQueryableStateTestBase.getKvState(deadline, client, jobId, "timon-queryable", key, BasicTypeInfo.INT_TYPE_INFO, mapStateDescriptor, false, this.executor);
                    Tuple2 value = (Tuple2)((MapState)future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get((Object)key);
                    if (value != null && value.f0 != null && 524800L == (Long)value.f1) {
                        Assertions.assertThat((int)key).isEqualTo(((Integer)value.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                        success = true;
                        continue;
                    }
                    Thread.sleep(50L);
                }
                Assertions.assertThat((boolean)success).isTrue().withFailMessage("Did not succeed query", new Object[0]);
            }
        }
    }

    @Test
    void testListState() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        final ListStateDescriptor listStateDescriptor = new ListStateDescriptor("list", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO);
        listStateDescriptor.setQueryable("list-queryable");
        source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).process((KeyedProcessFunction)new KeyedProcessFunction<Integer, Tuple2<Integer, Long>, Object>(){
            private static final long serialVersionUID = -805125545438296619L;
            private transient ListState<Long> listState;

            public void open(OpenContext openContext) throws Exception {
                super.open(openContext);
                this.listState = this.getRuntimeContext().getListState(listStateDescriptor);
            }

            public void processElement(Tuple2<Integer, Long> value, KeyedProcessFunction.Context ctx, Collector<Object> out) throws Exception {
                this.listState.add((Object)((Long)value.f1));
            }
        });
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            int key;
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            HashMap results = new HashMap();
            for (key = 0; key < maxParallelism; ++key) {
                boolean success = false;
                while (deadline.hasTimeLeft() && !success) {
                    CompletableFuture future = AbstractQueryableStateTestBase.getKvState(deadline, client, jobId, "list-queryable", key, BasicTypeInfo.INT_TYPE_INFO, listStateDescriptor, false, this.executor);
                    Iterable value = (Iterable)((ListState)future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get();
                    HashSet<Long> res = new HashSet<Long>();
                    for (Long v : value) {
                        res.add(v);
                    }
                    if ((long)res.size() == 1025L) {
                        success = true;
                        results.put(key, res);
                        continue;
                    }
                    Thread.sleep(50L);
                }
                Assertions.assertThat((boolean)success).isTrue().withFailMessage("Did not succeed query", new Object[0]);
            }
            for (key = 0; key < maxParallelism; ++key) {
                Set values = (Set)results.get(key);
                for (long i = 0L; i <= 1024L; ++i) {
                    Assertions.assertThat((Collection)values).contains((Object[])new Long[]{i});
                }
            }
        }
    }

    @Test
    void testAggregatingState() throws Exception {
        Deadline deadline = Deadline.now().plus(TEST_TIMEOUT);
        long numElements = 1024L;
        this.env.setParallelism(maxParallelism);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)Integer.MAX_VALUE, (long)1000L);
        DataStreamSource source = this.env.addSource((SourceFunction)new TestAscendingValueSource(1024L));
        AggregatingStateDescriptor aggrStateDescriptor = new AggregatingStateDescriptor("aggregates", (AggregateFunction)new SumAggr(), String.class);
        aggrStateDescriptor.setQueryable("aggr-queryable");
        source.keyBy((KeySelector)new KeySelector<Tuple2<Integer, Long>, Integer>(){
            private static final long serialVersionUID = 8470749712274833552L;

            public Integer getKey(Tuple2<Integer, Long> value) {
                return (Integer)value.f0;
            }
        }).transform("TestAggregatingOperator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new AggregatingTestOperator((AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String>)aggrStateDescriptor));
        try (AutoCancellableJob autoCancellableJob = new AutoCancellableJob(deadline, clusterClient, this.env);){
            JobID jobId = autoCancellableJob.getJobId();
            JobGraph jobGraph = autoCancellableJob.getJobGraph();
            clusterClient.submitJob((ExecutionPlan)jobGraph).get();
            for (int key = 0; key < maxParallelism; ++key) {
                boolean success = false;
                while (deadline.hasTimeLeft() && !success) {
                    CompletableFuture future = AbstractQueryableStateTestBase.getKvState(deadline, client, jobId, "aggr-queryable", key, BasicTypeInfo.INT_TYPE_INFO, aggrStateDescriptor, false, this.executor);
                    String value = (String)((AggregatingState)future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).get();
                    if (Long.parseLong(value) == 524800L) {
                        success = true;
                        continue;
                    }
                    Thread.sleep(50L);
                }
                Assertions.assertThat((boolean)success).isTrue().withFailMessage("Did not succeed query", new Object[0]);
            }
        }
    }

    private static <K, S extends State, V> CompletableFuture<S> getKvState(Deadline deadline, QueryableStateClient client, JobID jobId, String queryName, K key, TypeInformation<K> keyTypeInfo, StateDescriptor<S, V> stateDescriptor, boolean failForUnknownKeyOrNamespace, ScheduledExecutor executor) {
        CompletableFuture resultFuture = new CompletableFuture();
        AbstractQueryableStateTestBase.getKvStateIgnoringCertainExceptions(deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo, stateDescriptor, failForUnknownKeyOrNamespace, executor);
        return resultFuture;
    }

    private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(Deadline deadline, CompletableFuture<S> resultFuture, QueryableStateClient client, JobID jobId, String queryName, K key, TypeInformation<K> keyTypeInfo, StateDescriptor<S, V> stateDescriptor, boolean failForUnknownKeyOrNamespace, ScheduledExecutor executor) {
        if (!resultFuture.isDone()) {
            CompletableFuture expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
            expected.whenCompleteAsync((result, throwable) -> {
                if (throwable != null) {
                    if (throwable.getCause() instanceof CancellationException || throwable.getCause() instanceof AssertionError || failForUnknownKeyOrNamespace && throwable.getCause() instanceof UnknownKeyOrNamespaceException) {
                        resultFuture.completeExceptionally(throwable.getCause());
                    } else if (deadline.hasTimeLeft()) {
                        AbstractQueryableStateTestBase.getKvStateIgnoringCertainExceptions(deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo, stateDescriptor, failForUnknownKeyOrNamespace, executor);
                    }
                } else {
                    resultFuture.complete(result);
                }
            }, (Executor)executor);
            resultFuture.whenComplete((result, throwable) -> expected.cancel(false));
        }
    }

    private void executeValueQuery(Deadline deadline, QueryableStateClient client, JobID jobId, String queryableStateName, ValueStateDescriptor<Tuple2<Integer, Long>> stateDescriptor, long expected) throws Exception {
        for (int key = 0; key < maxParallelism; ++key) {
            boolean success = false;
            while (deadline.hasTimeLeft() && !success) {
                CompletableFuture future = AbstractQueryableStateTestBase.getKvState(deadline, client, jobId, queryableStateName, key, BasicTypeInfo.INT_TYPE_INFO, stateDescriptor, false, this.executor);
                Tuple2 value = (Tuple2)((ValueState)future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS)).value();
                Assertions.assertThat((int)key).isEqualTo(((Integer)value.f0).intValue()).withFailMessage("Key mismatch", new Object[0]);
                if (expected == (Long)value.f1) {
                    success = true;
                    continue;
                }
                Thread.sleep(50L);
            }
            Assertions.assertThat((boolean)success).isTrue().withFailMessage("Did not succeed query", new Object[0]);
        }
    }

    private static URLClassLoader createLoaderWithCustomKryoSerializer(String className) throws IOException {
        return ClassLoaderUtils.compileAndLoadJava((File)classloaderFolder, (String)(className + ".java"), (String)("import com.esotericsoftware.kryo.Kryo;\nimport com.esotericsoftware.kryo.Serializer;\nimport com.esotericsoftware.kryo.io.Input;\nimport com.esotericsoftware.kryo.io.Output;\nimport java.io.Serializable;\npublic class " + className + " extends Serializer<Byte> implements Serializable {\n    @Override\n    public void write(Kryo kryo, Output output, Byte testJob) {}\n    @Override\n    public Byte read(Kryo kryo, Input input, Class<? extends Byte> aClass) {\n        return null;\n    }\n}\n"));
    }

    private static class TestKeyRangeSource
    extends RichParallelSourceFunction<Tuple2<Integer, Long>>
    implements CheckpointListener {
        private static final long serialVersionUID = -5744725196953582710L;
        private static final AtomicLong LATEST_CHECKPOINT_ID = new AtomicLong();
        private final int numKeys;
        private final ThreadLocalRandom random = ThreadLocalRandom.current();
        private volatile boolean isRunning = true;
        private int counter = 0;

        TestKeyRangeSource(int numKeys) {
            this.numKeys = numKeys;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            if (this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                LATEST_CHECKPOINT_ID.set(0L);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
            Tuple2 record = new Tuple2((Object)0, (Object)1L);
            while (this.isRunning) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    record.f0 = this.random.nextInt(this.numKeys);
                    ctx.collect((Object)record);
                    ++this.counter;
                }
                if (this.counter % 50 != 0) continue;
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                LATEST_CHECKPOINT_ID.set(checkpointId);
            }
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }
    }

    protected static class SumReduce
    implements ReduceFunction<Tuple2<Integer, Long>> {
        private static final long serialVersionUID = -8651235077342052336L;

        protected SumReduce() {
        }

        public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
            Tuple2<Integer, Long> tuple2 = value1;
            tuple2.f1 = (Long)tuple2.f1 + (Long)value2.f1;
            return value1;
        }
    }

    private static class AutoCancellableJob
    implements AutoCloseable {
        private final ClusterClient<?> clusterClient;
        private final JobGraph jobGraph;
        private final JobID jobId;
        private final Deadline deadline;

        AutoCancellableJob(Deadline deadline, ClusterClient<?> clusterClient, StreamExecutionEnvironment env) {
            Preconditions.checkNotNull((Object)env);
            this.clusterClient = (ClusterClient)Preconditions.checkNotNull(clusterClient);
            this.jobGraph = env.getStreamGraph().getJobGraph();
            this.jobId = (JobID)Preconditions.checkNotNull((Object)this.jobGraph.getJobID());
            this.deadline = deadline;
        }

        JobGraph getJobGraph() {
            return this.jobGraph;
        }

        JobID getJobId() {
            return this.jobId;
        }

        @Override
        public void close() throws Exception {
            this.clusterClient.cancel(this.jobId).get();
            CompletableFuture jobStatusFuture = FutureUtils.retrySuccessfulWithDelay(() -> this.clusterClient.getJobStatus(this.jobId), (Duration)Duration.ofMillis(50L), (Deadline)this.deadline, jobStatus -> jobStatus.equals((Object)JobStatus.CANCELED), (ScheduledExecutor)new ScheduledExecutorServiceAdapter((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor()));
            Assertions.assertThat((Comparable)((JobStatus)jobStatusFuture.get(this.deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS))).isEqualTo((Object)JobStatus.CANCELED);
        }
    }

    private static class TestAscendingValueSource
    extends RichParallelSourceFunction<Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1459935229498173245L;
        private final long maxValue;
        private volatile boolean isRunning = true;

        TestAscendingValueSource(long maxValue) {
            Preconditions.checkArgument((maxValue >= 0L ? 1 : 0) != 0);
            this.maxValue = maxValue;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
            Object object;
            int key = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            Tuple2 record = new Tuple2((Object)key, (Object)0L);
            for (long currentValue = 0L; this.isRunning && currentValue <= this.maxValue; ++currentValue) {
                object = ctx.getCheckpointLock();
                synchronized (object) {
                    record.f1 = currentValue;
                    ctx.collect((Object)record);
                    continue;
                }
            }
            while (this.isRunning) {
                object = this;
                synchronized (object) {
                    ((Object)((Object)this)).wait();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            this.isRunning = false;
            TestAscendingValueSource testAscendingValueSource = this;
            synchronized (testAscendingValueSource) {
                ((Object)((Object)this)).notifyAll();
            }
        }
    }

    private static class SumAggr
    implements AggregateFunction<Tuple2<Integer, Long>, String, String> {
        private static final long serialVersionUID = -6249227626701264599L;

        private SumAggr() {
        }

        public String createAccumulator() {
            return "0";
        }

        public String add(Tuple2<Integer, Long> value, String accumulator) {
            long acc = Long.valueOf(accumulator);
            return Long.toString(acc += ((Long)value.f1).longValue());
        }

        public String getResult(String accumulator) {
            return accumulator;
        }

        public String merge(String a, String b) {
            return Long.toString(Long.valueOf(a) + Long.valueOf(b));
        }
    }

    private static class AggregatingTestOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<Tuple2<Integer, Long>, String> {
        private static final long serialVersionUID = 1L;
        private final AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDescriptor;
        private transient AggregatingState<Tuple2<Integer, Long>, String> state;

        AggregatingTestOperator(AggregatingStateDescriptor<Tuple2<Integer, Long>, String, String> stateDesc) {
            this.stateDescriptor = stateDesc;
        }

        public void open() throws Exception {
            super.open();
            this.state = (AggregatingState)this.getKeyedStateBackend().getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, this.stateDescriptor);
        }

        public void processElement(StreamRecord<Tuple2<Integer, Long>> element) throws Exception {
            this.state.add((Object)((Tuple2)element.getValue()));
        }
    }
}

