package org.apache.spark.streaming;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaMapWithStateDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.junit.Assert;
import org.junit.Test;
import org.spark_project.guava.collect.Sets;
import scala.Tuple2;

/* loaded from: input_file:org/apache/spark/streaming/JavaMapWithStateSuite.class */
public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements Serializable {
    public void testAPI() {
        JavaPairDStream javaPairDStream = null;
        javaPairDStream.mapWithState(StateSpec.function(new Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>>() { // from class: org.apache.spark.streaming.JavaMapWithStateSuite.1
            public Optional<Double> call(Time time, String str, Optional<Integer> optional, State<Boolean> state) {
                state.exists();
                state.get();
                state.isTimingOut();
                state.remove();
                state.update(true);
                return Optional.of(Double.valueOf(2.0d));
            }
        }).initialState((JavaPairRDD) null).numPartitions(10).partitioner(new HashPartitioner(10)).timeout(Durations.seconds(10L))).stateSnapshots();
        javaPairDStream.mapWithState(StateSpec.function(new Function3<String, Optional<Integer>, State<Boolean>, Double>() { // from class: org.apache.spark.streaming.JavaMapWithStateSuite.2
            public Double call(String str, Optional<Integer> optional, State<Boolean> state) {
                state.exists();
                state.get();
                state.isTimingOut();
                state.remove();
                state.update(true);
                return Double.valueOf(2.0d);
            }
        }).initialState((JavaPairRDD) null).numPartitions(10).partitioner(new HashPartitioner(10)).timeout(Durations.seconds(10L))).stateSnapshots();
    }

    @Test
    public void testBasicFunction() {
        testOperation(Arrays.asList(Collections.emptyList(), Arrays.asList("a"), Arrays.asList("a", "b"), Arrays.asList("a", "b", "c"), Arrays.asList("a", "b"), Arrays.asList("a"), Collections.emptyList()), StateSpec.function(new Function3<String, Optional<Integer>, State<Integer>, Integer>() { // from class: org.apache.spark.streaming.JavaMapWithStateSuite.3
            public Integer call(String str, Optional<Integer> optional, State<Integer> state) {
                int intValue = ((Integer) optional.orElse(0)).intValue() + (state.exists() ? ((Integer) state.get()).intValue() : 0);
                state.update(Integer.valueOf(intValue));
                return Integer.valueOf(intValue);
            }
        }), Arrays.asList(Collections.emptySet(), Sets.newHashSet(new Integer[]{1}), Sets.newHashSet(new Integer[]{2, 1}), Sets.newHashSet(new Integer[]{3, 2, 1}), Sets.newHashSet(new Integer[]{4, 3}), Sets.newHashSet(new Integer[]{5}), Collections.emptySet()), Arrays.asList(Collections.emptySet(), Sets.newHashSet(new Tuple2[]{new Tuple2("a", 1)}), Sets.newHashSet(new Tuple2[]{new Tuple2("a", 2), new Tuple2("b", 1)}), Sets.newHashSet(new Tuple2[]{new Tuple2("a", 3), new Tuple2("b", 2), new Tuple2("c", 1)}), Sets.newHashSet(new Tuple2[]{new Tuple2("a", 4), new Tuple2("b", 3), new Tuple2("c", 1)}), Sets.newHashSet(new Tuple2[]{new Tuple2("a", 5), new Tuple2("b", 3), new Tuple2("c", 1)}), Sets.newHashSet(new Tuple2[]{new Tuple2("a", 5), new Tuple2("b", 3), new Tuple2("c", 1)})));
    }

    private <K, S, T> void testOperation(List<List<K>> list, StateSpec<K, Integer, S, T> stateSpec, List<Set<T>> list2, List<Set<Tuple2<K, S>>> list3) {
        int size = list2.size();
        JavaMapWithStateDStream mapWithState = JavaPairDStream.fromJavaDStream(JavaTestUtils.attachTestInputStream(this.ssc, list, 2).map(new Function<K, Tuple2<K, Integer>>() { // from class: org.apache.spark.streaming.JavaMapWithStateSuite.4
            public Tuple2<K, Integer> call(K k) {
                return new Tuple2<>(k, 1);
            }

            /* JADX WARN: Multi-variable type inference failed */
            /* renamed from: call, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m222call(Object obj) throws Exception {
                return call((AnonymousClass4<K>) obj);
            }
        })).mapWithState(stateSpec);
        final List synchronizedList = Collections.synchronizedList(new ArrayList());
        mapWithState.foreachRDD(new VoidFunction<JavaRDD<T>>() { // from class: org.apache.spark.streaming.JavaMapWithStateSuite.5
            public void call(JavaRDD<T> javaRDD) {
                synchronizedList.add(Sets.newHashSet(javaRDD.collect()));
            }
        });
        final List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        mapWithState.stateSnapshots().foreachRDD(new VoidFunction<JavaPairRDD<K, S>>() { // from class: org.apache.spark.streaming.JavaMapWithStateSuite.6
            public void call(JavaPairRDD<K, S> javaPairRDD) {
                synchronizedList2.add(Sets.newHashSet(javaPairRDD.collect()));
            }
        });
        BatchCounter batchCounter = new BatchCounter(this.ssc.ssc());
        this.ssc.start();
        this.ssc.ssc().scheduler().clock().advance((this.ssc.ssc().progressListener().batchDuration() * size) + 1);
        batchCounter.waitUntilBatchesCompleted(size, 10000L);
        Assert.assertEquals(list2, synchronizedList);
        Assert.assertEquals(list3, synchronizedList2);
    }
}
